c795e66a15eaf27843efd0fb4029429e8ea6d436
[mono.git] / mcs / class / referencesource / System.ServiceModel / System / ServiceModel / Channels / Connection.cs
1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //------------------------------------------------------------
4
5 namespace System.ServiceModel.Channels
6 {
7     using System.Diagnostics;
8     using System.IO;
9     using System.Net;
10     using System.Runtime;
11     using System.ServiceModel;
12     using System.Threading;
13     using System.ServiceModel.Diagnostics.Application;
14
15     // Low level abstraction for a socket/pipe
16     interface IConnection
17     {
18         byte[] AsyncReadBuffer { get; }
19         int AsyncReadBufferSize { get; }
20         TraceEventType ExceptionEventType { get; set; }
21         IPEndPoint RemoteIPEndPoint { get; }
22
23         void Abort();
24         void Close(TimeSpan timeout, bool asyncAndLinger);
25         void Shutdown(TimeSpan timeout);
26
27         AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
28             WaitCallback callback, object state);
29         void EndWrite();
30         void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout);
31         void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager);
32
33         int Read(byte[] buffer, int offset, int size, TimeSpan timeout);
34         AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state);
35         int EndRead();
36
37         // very ugly listener stuff
38         object DuplicateAndClose(int targetProcessId);
39         object GetCoreTransport();
40         IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state);
41         bool EndValidate(IAsyncResult result);
42     }
43
44     // Low level abstraction for connecting a socket/pipe
45     interface IConnectionInitiator
46     {
47         IConnection Connect(Uri uri, TimeSpan timeout);
48         IAsyncResult BeginConnect(Uri uri, TimeSpan timeout, AsyncCallback callback, object state);
49         IConnection EndConnect(IAsyncResult result);
50     }
51
52     // Low level abstraction for listening for sockets/pipes
53     interface IConnectionListener : IDisposable
54     {
55         void Listen();
56         IAsyncResult BeginAccept(AsyncCallback callback, object state);
57         IConnection EndAccept(IAsyncResult result);
58     }
59
60     abstract class DelegatingConnection : IConnection
61     {
62         IConnection connection;
63
64         protected DelegatingConnection(IConnection connection)
65         {
66             this.connection = connection;
67         }
68
69         public virtual byte[] AsyncReadBuffer
70         {
71             get { return connection.AsyncReadBuffer; }
72         }
73
74         public virtual int AsyncReadBufferSize
75         {
76             get { return connection.AsyncReadBufferSize; }
77         }
78
79         public TraceEventType ExceptionEventType
80         {
81             get { return connection.ExceptionEventType; }
82             set { connection.ExceptionEventType = value; }
83         }
84
85         protected IConnection Connection
86         {
87             get { return connection; }
88         }
89
90         public IPEndPoint RemoteIPEndPoint
91         {
92             get { return connection.RemoteIPEndPoint; }
93         }
94
95         public virtual void Abort()
96         {
97             connection.Abort();
98         }
99
100         public virtual void Close(TimeSpan timeout, bool asyncAndLinger)
101         {
102             connection.Close(timeout, asyncAndLinger);
103         }
104
105         public virtual void Shutdown(TimeSpan timeout)
106         {
107             connection.Shutdown(timeout);
108         }
109
110         public virtual object DuplicateAndClose(int targetProcessId)
111         {
112             return connection.DuplicateAndClose(targetProcessId);
113         }
114
115         public virtual object GetCoreTransport()
116         {
117             return connection.GetCoreTransport();
118         }
119
120         public virtual IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
121         {
122             return connection.BeginValidate(uri, callback, state);
123         }
124
125         public virtual bool EndValidate(IAsyncResult result)
126         {
127             return connection.EndValidate(result);
128         }
129
130         public virtual AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
131             WaitCallback callback, object state)
132         {
133             return connection.BeginWrite(buffer, offset, size, immediate, timeout, callback, state);
134         }
135
136         public virtual void EndWrite()
137         {
138             connection.EndWrite();
139         }
140
141         public virtual void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout)
142         {
143             connection.Write(buffer, offset, size, immediate, timeout);
144         }
145
146         public virtual void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager)
147         {
148             connection.Write(buffer, offset, size, immediate, timeout, bufferManager);
149         }
150
151         public virtual int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
152         {
153             return connection.Read(buffer, offset, size, timeout);
154         }
155
156         public virtual AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout,
157             WaitCallback callback, object state)
158         {
159             return connection.BeginRead(offset, size, timeout, callback, state);
160         }
161
162         public virtual int EndRead()
163         {
164             return connection.EndRead();
165         }
166     }
167
168     class PreReadConnection : DelegatingConnection
169     {
170         int asyncBytesRead;
171         byte[] preReadData;
172         int preReadOffset;
173         int preReadCount;
174
175         public PreReadConnection(IConnection innerConnection, byte[] initialData)
176             : this(innerConnection, initialData, 0, initialData.Length)
177         {
178         }
179
180         public PreReadConnection(IConnection innerConnection, byte[] initialData, int initialOffset, int initialSize)
181             : base(innerConnection)
182         {
183             this.preReadData = initialData;
184             this.preReadOffset = initialOffset;
185             this.preReadCount = initialSize;
186         }
187
188         public void AddPreReadData(byte[] initialData, int initialOffset, int initialSize)
189         {
190             if (this.preReadCount > 0)
191             {
192                 byte[] tempBuffer = this.preReadData;
193                 this.preReadData = DiagnosticUtility.Utility.AllocateByteArray(initialSize + this.preReadCount);
194                 Buffer.BlockCopy(tempBuffer, this.preReadOffset, this.preReadData, 0, this.preReadCount);
195                 Buffer.BlockCopy(initialData, initialOffset, this.preReadData, this.preReadCount, initialSize);
196                 this.preReadOffset = 0;
197                 this.preReadCount += initialSize;
198             }
199             else
200             {
201                 this.preReadData = initialData;
202                 this.preReadOffset = initialOffset;
203                 this.preReadCount = initialSize;
204             }
205         }
206
207         public override int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
208         {
209             ConnectionUtilities.ValidateBufferBounds(buffer, offset, size);
210
211             if (this.preReadCount > 0)
212             {
213                 int bytesToCopy = Math.Min(size, this.preReadCount);
214                 Buffer.BlockCopy(this.preReadData, this.preReadOffset, buffer, offset, bytesToCopy);
215                 this.preReadOffset += bytesToCopy;
216                 this.preReadCount -= bytesToCopy;
217                 return bytesToCopy;
218             }
219
220             return base.Read(buffer, offset, size, timeout);
221         }
222
223         public override AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state)
224         {
225             ConnectionUtilities.ValidateBufferBounds(AsyncReadBufferSize, offset, size);
226
227             if (this.preReadCount > 0)
228             {
229                 int bytesToCopy = Math.Min(size, this.preReadCount);
230                 Buffer.BlockCopy(this.preReadData, this.preReadOffset, AsyncReadBuffer, offset, bytesToCopy);
231                 this.preReadOffset += bytesToCopy;
232                 this.preReadCount -= bytesToCopy;
233                 this.asyncBytesRead = bytesToCopy;
234                 return AsyncCompletionResult.Completed;
235             }
236
237             return base.BeginRead(offset, size, timeout, callback, state);
238         }
239
240         public override int EndRead()
241         {
242             if (this.asyncBytesRead > 0)
243             {
244                 int retValue = this.asyncBytesRead;
245                 this.asyncBytesRead = 0;
246                 return retValue;
247             }
248
249             return base.EndRead();
250         }
251     }
252
253     class ConnectionStream : Stream
254     {
255         TimeSpan closeTimeout;
256         int readTimeout;
257         int writeTimeout;
258         IConnection connection;
259         bool immediate;
260
261         public ConnectionStream(IConnection connection, IDefaultCommunicationTimeouts defaultTimeouts)
262         {
263             this.connection = connection;
264             this.closeTimeout = defaultTimeouts.CloseTimeout;
265             this.ReadTimeout = TimeoutHelper.ToMilliseconds(defaultTimeouts.ReceiveTimeout);
266             this.WriteTimeout = TimeoutHelper.ToMilliseconds(defaultTimeouts.SendTimeout);
267             immediate = true;
268         }
269
270         public IConnection Connection
271         {
272             get { return connection; }
273         }
274
275         public override bool CanRead
276         {
277             get { return true; }
278         }
279
280         public override bool CanSeek
281         {
282             get { return false; }
283         }
284
285         public override bool CanTimeout
286         {
287             get { return true; }
288         }
289
290         public override bool CanWrite
291         {
292             get { return true; }
293         }
294
295         public TimeSpan CloseTimeout
296         {
297             get { return closeTimeout; }
298             set { this.closeTimeout = value; }
299         }
300
301         public override int ReadTimeout
302         {
303             get { return this.readTimeout; }
304             set
305             {
306                 if (value < -1)
307                 {
308                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
309                         SR.GetString(SR.ValueMustBeInRange, -1, int.MaxValue)));
310                 }
311
312                 this.readTimeout = value;
313             }
314         }
315
316         public override int WriteTimeout
317         {
318             get { return this.writeTimeout; }
319             set
320             {
321                 if (value < -1)
322                 {
323                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
324                         SR.GetString(SR.ValueMustBeInRange, -1, int.MaxValue)));
325                 }
326
327                 this.writeTimeout = value;
328             }
329         }
330
331         public bool Immediate
332         {
333             get { return immediate; }
334             set { immediate = value; }
335         }
336
337         public override long Length
338         {
339             get
340             {
341 #pragma warning suppress 56503 // Microsoft, required by the Stream.Length contract
342                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
343             }
344         }
345
346         public override long Position
347         {
348             get
349             {
350 #pragma warning suppress 56503 // Microsoft, required by the Stream.Position contract
351                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
352             }
353             set
354             {
355                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
356             }
357         }
358
359         public TraceEventType ExceptionEventType
360         {
361             get { return connection.ExceptionEventType; }
362             set { connection.ExceptionEventType = value; }
363         }
364
365         public void Abort()
366         {
367             connection.Abort();
368         }
369
370         public override void Close()
371         {
372             connection.Close(this.CloseTimeout, false);
373         }
374
375         public override void Flush()
376         {
377             // NOP
378         }
379
380         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
381         {
382             return new WriteAsyncResult(this.connection, buffer, offset, count, this.Immediate, TimeoutHelper.FromMilliseconds(this.WriteTimeout), callback, state);
383         }
384
385         public override void EndWrite(IAsyncResult asyncResult)
386         {
387             WriteAsyncResult.End(asyncResult);
388         }
389
390         public override void Write(byte[] buffer, int offset, int count)
391         {
392             connection.Write(buffer, offset, count, this.Immediate, TimeoutHelper.FromMilliseconds(this.WriteTimeout));
393         }
394
395         public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
396         {
397             return new ReadAsyncResult(connection, buffer, offset, count, TimeoutHelper.FromMilliseconds(this.ReadTimeout), callback, state);
398         }
399
400         public override int EndRead(IAsyncResult asyncResult)
401         {
402             return ReadAsyncResult.End(asyncResult);
403         }
404
405         public override int Read(byte[] buffer, int offset, int count)
406         {
407             return this.Read(buffer, offset, count, TimeoutHelper.FromMilliseconds(this.ReadTimeout));
408         }
409
410         protected int Read(byte[] buffer, int offset, int count, TimeSpan timeout)
411         {
412             return connection.Read(buffer, offset, count, timeout);
413         }
414
415         public override long Seek(long offset, SeekOrigin origin)
416         {
417             throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
418         }
419
420
421         public override void SetLength(long value)
422         {
423             throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
424         }
425
426         public void Shutdown(TimeSpan timeout)
427         {
428             connection.Shutdown(timeout);
429         }
430
431         public IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
432         {
433             return this.connection.BeginValidate(uri, callback, state);
434         }
435
436         public bool EndValidate(IAsyncResult result)
437         {
438             return this.connection.EndValidate(result);
439         }
440
441         abstract class IOAsyncResult : AsyncResult
442         {
443             static WaitCallback onAsyncIOComplete;
444             IConnection connection;
445
446             protected IOAsyncResult(IConnection connection, AsyncCallback callback, object state)
447                 : base(callback, state)
448             {
449                 this.connection = connection;
450             }
451
452             protected WaitCallback GetWaitCompletion()
453             {
454                 if (onAsyncIOComplete == null)
455                 {
456                     onAsyncIOComplete = new WaitCallback(OnAsyncIOComplete);
457                 }
458
459                 return onAsyncIOComplete;
460             }
461
462             protected abstract void HandleIO(IConnection connection);
463
464             static void OnAsyncIOComplete(object state)
465             {
466                 IOAsyncResult thisPtr = (IOAsyncResult)state;
467
468                 Exception completionException = null;
469                 try
470                 {
471                     thisPtr.HandleIO(thisPtr.connection);
472                 }
473 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
474                 catch (Exception e)
475                 {
476                     if (Fx.IsFatal(e))
477                     {
478                         throw;
479                     }
480
481                     completionException = e;
482                 }
483                 thisPtr.Complete(false, completionException);
484             }
485         }
486
487         sealed class ReadAsyncResult : IOAsyncResult
488         {
489             int bytesRead;
490             byte[] buffer;
491             int offset;
492
493             public ReadAsyncResult(IConnection connection, byte[] buffer, int offset, int count, TimeSpan timeout,
494                 AsyncCallback callback, object state)
495                 : base(connection, callback, state)
496             {
497                 this.buffer = buffer;
498                 this.offset = offset;
499
500                 AsyncCompletionResult readResult = connection.BeginRead(0, Math.Min(count, connection.AsyncReadBufferSize),
501                     timeout, GetWaitCompletion(), this);
502                 if (readResult == AsyncCompletionResult.Completed)
503                 {
504                     HandleIO(connection);
505                     base.Complete(true);
506                 }
507             }
508
509             protected override void HandleIO(IConnection connection)
510             {
511                 bytesRead = connection.EndRead();
512                 Buffer.BlockCopy(connection.AsyncReadBuffer, 0, buffer, offset, bytesRead);
513             }
514
515             public static int End(IAsyncResult result)
516             {
517                 ReadAsyncResult thisPtr = AsyncResult.End<ReadAsyncResult>(result);
518                 return thisPtr.bytesRead;
519             }
520         }
521
522         sealed class WriteAsyncResult : IOAsyncResult
523         {
524             public WriteAsyncResult(IConnection connection, byte[] buffer, int offset, int count, bool immediate, TimeSpan timeout, AsyncCallback callback, object state)
525                 : base(connection, callback, state)
526             {
527                 AsyncCompletionResult writeResult = connection.BeginWrite(buffer, offset, count, immediate, timeout, GetWaitCompletion(), this);
528                 if (writeResult == AsyncCompletionResult.Completed)
529                 {
530                     HandleIO(connection);
531                     base.Complete(true);
532                 }
533             }
534
535             protected override void HandleIO(IConnection connection)
536             {
537                 connection.EndWrite();
538             }
539
540             public static void End(IAsyncResult result)
541             {
542                 AsyncResult.End<WriteAsyncResult>(result);
543             }
544         }
545     }
546
547     class StreamConnection : IConnection
548     {
549         byte[] asyncReadBuffer;
550         int bytesRead;
551         ConnectionStream innerStream;
552         AsyncCallback onRead;
553         AsyncCallback onWrite;
554         IAsyncResult readResult;
555         IAsyncResult writeResult;
556         WaitCallback readCallback;
557         WaitCallback writeCallback;
558         Stream stream;
559
560         public StreamConnection(Stream stream, ConnectionStream innerStream)
561         {
562             Fx.Assert(stream != null, "StreamConnection: Stream cannot be null.");
563             Fx.Assert(innerStream != null, "StreamConnection: Inner stream cannot be null.");
564
565             this.stream = stream;
566             this.innerStream = innerStream;
567
568             onRead = Fx.ThunkCallback(new AsyncCallback(OnRead));
569             onWrite = Fx.ThunkCallback(new AsyncCallback(OnWrite));
570         }
571
572         public byte[] AsyncReadBuffer
573         {
574             get
575             {
576                 if (this.asyncReadBuffer == null)
577                 {
578                     lock (ThisLock)
579                     {
580                         if (this.asyncReadBuffer == null)
581                         {
582                             this.asyncReadBuffer = DiagnosticUtility.Utility.AllocateByteArray(innerStream.Connection.AsyncReadBufferSize);
583                         }
584                     }
585                 }
586
587                 return this.asyncReadBuffer;
588             }
589         }
590
591         public int AsyncReadBufferSize
592         {
593             get { return innerStream.Connection.AsyncReadBufferSize; }
594         }
595
596         public Stream Stream
597         {
598             get { return this.stream; }
599         }
600
601         public object ThisLock
602         {
603             get { return this; }
604         }
605
606         public TraceEventType ExceptionEventType
607         {
608             get { return innerStream.ExceptionEventType; }
609             set { innerStream.ExceptionEventType = value; }
610         }
611
612         public IPEndPoint RemoteIPEndPoint
613         {
614             get
615             {
616 #pragma warning suppress 56503 // Not publicly accessible and this should never be called.
617                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
618             }
619         }
620
621         public void Abort()
622         {
623             innerStream.Abort();
624         }
625
626         Exception ConvertIOException(IOException ioException)
627         {
628             if (ioException.InnerException is TimeoutException)
629             {
630                 return new TimeoutException(ioException.InnerException.Message, ioException);
631             }
632             else if (ioException.InnerException is CommunicationObjectAbortedException)
633             {
634                 return new CommunicationObjectAbortedException(ioException.InnerException.Message, ioException);
635             }
636             else if (ioException.InnerException is CommunicationException)
637             {
638                 return new CommunicationException(ioException.InnerException.Message, ioException);
639             }
640             else
641             {
642                 return new CommunicationException(SR.GetString(SR.StreamError), ioException);
643             }
644         }
645
646         public void Close(TimeSpan timeout, bool asyncAndLinger)
647         {
648             innerStream.CloseTimeout = timeout;
649             try
650             {
651                 stream.Close();
652             }
653             catch (IOException ioException)
654             {
655                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
656             }
657         }
658
659         public void Shutdown(TimeSpan timeout)
660         {
661             innerStream.Shutdown(timeout);
662         }
663
664         public object DuplicateAndClose(int targetProcessId)
665         {
666             throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
667         }
668
669         public virtual object GetCoreTransport()
670         {
671             throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
672         }
673
674         public IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
675         {
676             return this.innerStream.BeginValidate(uri, callback, state);
677         }
678
679         public bool EndValidate(IAsyncResult result)
680         {
681             return this.innerStream.EndValidate(result);
682         }
683
684         public AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
685             WaitCallback callback, object state)
686         {
687             if (callback == null)
688             {
689                 Fx.AssertAndThrow("Cannot call BeginWrite without a callback");
690             }
691
692             if (this.writeCallback != null)
693             {
694                 Fx.AssertAndThrow("BeginWrite cannot be called twice");
695             }
696
697             this.writeCallback = callback;
698             bool throwing = true;
699
700             try
701             {
702                 innerStream.Immediate = immediate;
703                 SetWriteTimeout(timeout);
704                 IAsyncResult localResult = stream.BeginWrite(buffer, offset, size, this.onWrite, state);
705
706                 if (!localResult.CompletedSynchronously)
707                 {
708                     throwing = false;
709                     return AsyncCompletionResult.Queued;
710                 }
711
712                 throwing = false;
713                 stream.EndWrite(localResult);
714             }
715             catch (IOException ioException)
716             {
717                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
718             }
719             finally
720             {
721                 if (throwing)
722                 {
723                     this.writeCallback = null;
724                 }
725             }
726
727             return AsyncCompletionResult.Completed;
728         }
729
730         public void EndWrite()
731         {
732             IAsyncResult localResult = this.writeResult;
733             this.writeResult = null;
734             this.writeCallback = null;
735
736             if (localResult != null)
737             {
738                 try
739                 {
740                     stream.EndWrite(localResult);
741                 }
742                 catch (IOException ioException)
743                 {
744                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
745                 }
746             }
747         }
748
749         void OnWrite(IAsyncResult result)
750         {
751             if (result.CompletedSynchronously)
752             {
753                 return;
754             }
755
756             if (this.writeResult != null)
757             {
758                 throw Fx.AssertAndThrow("StreamConnection: OnWrite called twice.");
759             }
760
761             this.writeResult = result;
762             this.writeCallback(result.AsyncState);
763         }
764
765         public void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout)
766         {
767             try
768             {
769                 innerStream.Immediate = immediate;
770                 SetWriteTimeout(timeout);
771                 stream.Write(buffer, offset, size);
772             }
773             catch (IOException ioException)
774             {
775                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
776             }
777         }
778
779         public void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager)
780         {
781             Write(buffer, offset, size, immediate, timeout);
782             bufferManager.ReturnBuffer(buffer);
783         }
784
785         void SetReadTimeout(TimeSpan timeout)
786         {
787             int timeoutInMilliseconds = TimeoutHelper.ToMilliseconds(timeout);
788             if (stream.CanTimeout)
789             {
790                 stream.ReadTimeout = timeoutInMilliseconds;
791             }
792             innerStream.ReadTimeout = timeoutInMilliseconds;
793         }
794
795         void SetWriteTimeout(TimeSpan timeout)
796         {
797             int timeoutInMilliseconds = TimeoutHelper.ToMilliseconds(timeout);
798             if (stream.CanTimeout)
799             {
800                 stream.WriteTimeout = timeoutInMilliseconds;
801             }
802             innerStream.WriteTimeout = timeoutInMilliseconds;
803         }
804
805         public int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
806         {
807             try
808             {
809                 SetReadTimeout(timeout);
810                 return stream.Read(buffer, offset, size);
811             }
812             catch (IOException ioException)
813             {
814                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
815             }
816         }
817
818         public AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state)
819         {
820             ConnectionUtilities.ValidateBufferBounds(AsyncReadBufferSize, offset, size);
821             readCallback = callback;
822
823             try
824             {
825                 SetReadTimeout(timeout);
826                 IAsyncResult localResult = stream.BeginRead(AsyncReadBuffer, offset, size, onRead, state);
827
828                 if (!localResult.CompletedSynchronously)
829                 {
830                     return AsyncCompletionResult.Queued;
831                 }
832
833                 bytesRead = stream.EndRead(localResult);
834             }
835             catch (IOException ioException)
836             {
837                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
838             }
839
840             return AsyncCompletionResult.Completed;
841         }
842
843         public int EndRead()
844         {
845             IAsyncResult localResult = this.readResult;
846             this.readResult = null;
847
848             if (localResult != null)
849             {
850                 try
851                 {
852                     bytesRead = stream.EndRead(localResult);
853                 }
854                 catch (IOException ioException)
855                 {
856                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
857                 }
858             }
859
860             return bytesRead;
861         }
862
863         void OnRead(IAsyncResult result)
864         {
865             if (result.CompletedSynchronously)
866             {
867                 return;
868             }
869
870             if (this.readResult != null)
871             {
872                 throw Fx.AssertAndThrow("StreamConnection: OnRead called twice.");
873             }
874
875             this.readResult = result;
876             readCallback(result.AsyncState);
877         }
878    }
879
880     class ConnectionMessageProperty
881     {
882         IConnection connection;
883
884         public ConnectionMessageProperty(IConnection connection)
885         {
886             this.connection = connection;
887         }
888
889         public static string Name
890         {
891             get { return "iconnection"; }
892         }
893
894         public IConnection Connection
895         {
896             get { return this.connection; }
897         }
898     }
899
900     static class ConnectionUtilities
901     {
902         internal static void CloseNoThrow(IConnection connection, TimeSpan timeout)
903         {
904             bool success = false;
905             try
906             {
907                 connection.Close(timeout, false);
908                 success = true;
909             }
910             catch (TimeoutException e)
911             {
912                 if (TD.CloseTimeoutIsEnabled())
913                 {
914                     TD.CloseTimeout(e.Message);
915                 }
916                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
917             }
918             catch (CommunicationException e)
919             {
920                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
921             }
922             finally
923             {
924                 if (!success)
925                 {
926                     connection.Abort();
927                 }
928             }
929         }
930
931         internal static void ValidateBufferBounds(ArraySegment<byte> buffer)
932         {
933             ValidateBufferBounds(buffer.Array, buffer.Offset, buffer.Count);
934         }
935
936         internal static void ValidateBufferBounds(byte[] buffer, int offset, int size)
937         {
938             if (buffer == null)
939             {
940                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("buffer");
941             }
942
943             ValidateBufferBounds(buffer.Length, offset, size);
944         }
945
946         internal static void ValidateBufferBounds(int bufferSize, int offset, int size)
947         {
948             if (offset < 0)
949             {
950                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("offset", offset, SR.GetString(
951                     SR.ValueMustBeNonNegative)));
952             }
953
954             if (offset > bufferSize)
955             {
956                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("offset", offset, SR.GetString(
957                     SR.OffsetExceedsBufferSize, bufferSize)));
958             }
959
960             if (size <= 0)
961             {
962                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("size", size, SR.GetString(
963                     SR.ValueMustBePositive)));
964             }
965
966             int remainingBufferSpace = bufferSize - offset;
967             if (size > remainingBufferSpace)
968             {
969                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("size", size, SR.GetString(
970                     SR.SizeExceedsRemainingBufferSpace, remainingBufferSpace)));
971             }
972         }
973     }
974 }