Adding reference source for System.Net
[mono.git] / mcs / class / referencesource / System / net / System / Net / WebSockets / WebSocketConnectionStream.cs
1 //------------------------------------------------------------------------------
2 // <copyright file="WebSocketConnectionStream.cs" company="Microsoft">
3 //     Copyright (c) Microsoft Corporation.  All rights reserved.
4 // </copyright>
5 //------------------------------------------------------------------------------
6
7 namespace System.Net.WebSockets
8 {
9     using System.Collections.Concurrent;
10     using System.Collections.Generic;
11     using System.Diagnostics;
12     using System.Diagnostics.Contracts;
13     using System.Globalization;
14     using System.IO;
15     using System.Net.Sockets;
16     using System.Threading;
17     using System.Threading.Tasks;
18
19     internal class WebSocketConnectionStream : BufferedReadStream, WebSocketBase.IWebSocketStream
20     {
21         private static readonly Func<Exception, bool> s_CanHandleException = new Func<Exception, bool>(CanHandleException);
22         private static readonly Action<object> s_OnCancel = new Action<object>(OnCancel);
23         private static readonly Action<object> s_OnCancelWebSocketConnection = new Action<object>(WebSocketConnection.OnCancel);
24         private static readonly Type s_NetworkStreamType = typeof(NetworkStream);
25         private readonly ConnectStream m_ConnectStream;
26         private readonly string m_ConnectionGroupName;
27         private readonly bool m_IsFastPathAllowed;
28         private readonly object m_CloseConnectStreamLock;
29         private bool m_InOpaqueMode;
30         private WebSocketConnection m_WebSocketConnection;
31
32         public WebSocketConnectionStream(ConnectStream connectStream, string connectionGroupName)
33             : base(new WebSocketConnection(connectStream.Connection), false)
34         {
35             Contract.Assert(connectStream != null,
36                 "'connectStream' MUST NOT be NULL.");
37             Contract.Assert(connectStream.Connection != null,
38                 "'connectStream.Conection' MUST NOT be NULL.");
39             Contract.Assert(connectStream.Connection.NetworkStream != null,
40                 "'connectStream.Conection.NetworkStream' MUST NOT be NULL.");
41             Contract.Assert(!string.IsNullOrEmpty(connectionGroupName), 
42                 "connectionGroupName should not be null or empty.");
43
44             m_ConnectStream = connectStream;
45             m_ConnectionGroupName = connectionGroupName;
46             m_CloseConnectStreamLock = new object();
47             // Make sure we don't short circuit for TlsStream or custom NetworkStream implementations
48             m_IsFastPathAllowed = m_ConnectStream.Connection.NetworkStream.GetType() == s_NetworkStreamType;
49
50             if (WebSocketBase.LoggingEnabled)
51             {
52                 Logging.Associate(Logging.WebSockets, this, m_ConnectStream.Connection);
53             }
54
55             ConsumeConnectStreamBuffer(connectStream);
56         }
57
58         public override bool CanSeek
59         {
60             get
61             {
62                 return false;
63             }
64         }
65
66         public override bool CanRead
67         {
68             get
69             {
70                 return true;
71             }
72         }
73
74         public override bool CanWrite
75         {
76             get
77             {
78                 return true;
79             }
80         }
81
82         public bool SupportsMultipleWrite
83         {
84             get
85             {
86                 return ((WebSocketConnection)this.BaseStream).SupportsMultipleWrite;
87             }
88         }
89
90         public async Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
91         {
92             // need to yield here to make sure that we don't get any exception synchronously
93             await Task.Yield();
94             if (WebSocketBase.LoggingEnabled)
95             {
96                 Logging.Enter(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
97             }
98
99             CancellationTokenSource reasonableTimeoutCancellationTokenSource = null;
100             CancellationTokenSource linkedCancellationTokenSource = null;
101             CancellationToken linkedCancellationToken = CancellationToken.None;
102
103             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
104             
105             int bytesRead = 0;
106             try
107             {
108                 reasonableTimeoutCancellationTokenSource = 
109                     new CancellationTokenSource(WebSocketHelpers.ClientTcpCloseTimeout);
110                 linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
111                     reasonableTimeoutCancellationTokenSource.Token,
112                     cancellationToken);
113                 linkedCancellationToken = linkedCancellationTokenSource.Token;
114                 cancellationTokenRegistration = linkedCancellationToken.Register(s_OnCancel, this, false);
115
116                 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, true);
117                 byte[] buffer = new byte[1];
118                 if (m_WebSocketConnection != null && m_InOpaqueMode)
119                 {
120                     bytesRead = await m_WebSocketConnection.ReadAsyncCore(buffer, 0, 1, linkedCancellationToken, true).SuppressContextFlow<int>();
121                 }
122                 else
123                 {
124                     bytesRead = await base.ReadAsync(buffer, 0, 1, linkedCancellationToken).SuppressContextFlow<int>();
125                 }
126
127                 if (bytesRead != 0)
128                 {
129                     Contract.Assert(false, "'bytesRead' MUST be '0' at this point. Instead more payload was received ('" + buffer[0].ToString() + "')");
130
131                     if (WebSocketBase.LoggingEnabled)
132                     {
133                         Logging.Dump(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, buffer, 0, bytesRead);
134                     }
135
136                     throw new WebSocketException(WebSocketError.NotAWebSocket);
137                 }
138             }
139             catch (Exception error)
140             {
141                 if (!s_CanHandleException(error))
142                 {
143                     throw;
144                 }
145
146                 // throw OperationCancelledException when canceled by the caller
147                 // ignore cancellation due to the timeout
148                 cancellationToken.ThrowIfCancellationRequested();
149             }
150             finally
151             {
152                 cancellationTokenRegistration.Dispose();
153                 if (linkedCancellationTokenSource != null)
154                 {
155                     linkedCancellationTokenSource.Dispose();
156                 }
157
158                 if (reasonableTimeoutCancellationTokenSource != null)
159                 {
160                     reasonableTimeoutCancellationTokenSource.Dispose();
161                 }
162
163                 if (WebSocketBase.LoggingEnabled)
164                 {
165                     Logging.Exit(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, bytesRead);
166                 }
167             }
168         }
169
170         public override void Close()
171         {
172             if (WebSocketBase.LoggingEnabled)
173             {
174                 Logging.Enter(Logging.WebSockets, this, Methods.Close, string.Empty);
175             }
176
177             try
178             {
179                 // Taking a lock to avoid a race condition between ConnectStream.CloseEx (called in OnCancel) and 
180                 // ServicePoint.CloseConnectionGroup (called in Close) which can result in a deadlock
181                 lock (m_CloseConnectStreamLock)
182                 {
183                     Contract.Assert(this.m_ConnectStream.Connection.ServicePoint != null, "connection.ServicePoint should not be null.");
184                     this.m_ConnectStream.Connection.ServicePoint.CloseConnectionGroup(this.m_ConnectionGroupName);
185                 }
186                 base.Close();
187             }
188             finally
189             {
190                 if (WebSocketBase.LoggingEnabled)
191                 {
192                     Logging.Exit(Logging.WebSockets, this, Methods.Close, string.Empty);
193                 }
194             }
195         }
196
197         public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
198         {
199             if (WebSocketBase.LoggingEnabled)
200             {
201                 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsync,
202                     WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
203             }
204
205             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
206
207             int bytesRead = 0;
208             try
209             {
210                 if (cancellationToken.CanBeCanceled)
211                 {
212                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
213                 }
214
215                 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, true);
216                 bytesRead = await base.ReadAsync(buffer, offset, count, cancellationToken).SuppressContextFlow<int>();
217
218                 if (WebSocketBase.LoggingEnabled)
219                 {
220                     Logging.Dump(Logging.WebSockets, this, Methods.ReadAsync, buffer, offset, bytesRead);
221                 }
222             }
223             catch (Exception error)
224             {
225                 if (s_CanHandleException(error))
226                 {
227                     cancellationToken.ThrowIfCancellationRequested();
228                 }
229
230                 throw;
231             }
232             finally
233             {
234                 cancellationTokenRegistration.Dispose();
235
236                 if (WebSocketBase.LoggingEnabled)
237                 {
238                     Logging.Exit(Logging.WebSockets, this, Methods.ReadAsync, bytesRead);
239                 }
240             }
241
242             return bytesRead;
243         }
244
245         public async override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
246         {
247             if (WebSocketBase.LoggingEnabled)
248             {
249                 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsync,
250                     WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
251             }
252             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
253
254             try
255             {
256                 if (cancellationToken.CanBeCanceled)
257                 {
258                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
259                 }
260
261                 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, false);
262                 await base.WriteAsync(buffer, offset, count, cancellationToken).SuppressContextFlow();
263
264                 if (WebSocketBase.LoggingEnabled)
265                 {
266                     Logging.Dump(Logging.WebSockets, this, Methods.WriteAsync, buffer, offset, count);
267                 }
268             }
269             catch (Exception error)
270             {
271                 if (s_CanHandleException(error))
272                 {
273                     cancellationToken.ThrowIfCancellationRequested();
274                 }
275
276                 throw;
277             }
278             finally
279             {
280                 cancellationTokenRegistration.Dispose();
281
282                 if (WebSocketBase.LoggingEnabled)
283                 {
284                     Logging.Exit(Logging.WebSockets, this,  Methods.WriteAsync, string.Empty);
285                 }
286             }
287         }
288
289         public void SwitchToOpaqueMode(WebSocketBase webSocket)
290         {
291             Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
292             Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
293
294             if (m_InOpaqueMode)
295             {
296                 throw new InvalidOperationException();
297             }
298
299             m_WebSocketConnection = BaseStream as WebSocketConnection;
300
301             if (m_WebSocketConnection != null && m_IsFastPathAllowed)
302             {
303                 if (WebSocketBase.LoggingEnabled)
304                 {
305                     Logging.Associate(Logging.WebSockets, this, m_WebSocketConnection);
306                 }
307
308                 m_WebSocketConnection.SwitchToOpaqueMode(webSocket);
309                 m_InOpaqueMode = true;
310             }
311         }
312
313         public async Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
314         {
315             Contract.Assert(this.SupportsMultipleWrite, "This method MUST NOT be used for custom NetworkStream implementations.");
316
317             if (WebSocketBase.LoggingEnabled)
318             {
319                 Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
320             }
321             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
322
323             try
324             {
325                 if (cancellationToken.CanBeCanceled)
326                 {
327                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
328                 }
329
330                 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, false);
331                 await ((WebSocketBase.IWebSocketStream)base.BaseStream).MultipleWriteAsync(sendBuffers, cancellationToken).SuppressContextFlow();
332
333                 if (WebSocketBase.LoggingEnabled)
334                 {
335                     foreach(ArraySegment<byte> buffer in sendBuffers)
336                     {
337                         Logging.Dump(Logging.WebSockets, this, Methods.MultipleWriteAsync, buffer.Array, buffer.Offset, buffer.Count);
338                     }
339                 }
340             }
341             catch (Exception error)
342             {
343                 if (s_CanHandleException(error))
344                 {
345                     cancellationToken.ThrowIfCancellationRequested();
346                 }
347
348                 throw;
349             }
350             finally
351             {
352                 cancellationTokenRegistration.Dispose();
353
354                 if (WebSocketBase.LoggingEnabled)
355                 {
356                     Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
357                 }
358             }
359         }
360
361         private static bool CanHandleException(Exception error)
362         {
363             return error is SocketException ||
364                 error is ObjectDisposedException ||
365                 error is WebException ||
366                 error is IOException;
367         }
368
369         private static void OnCancel(object state)
370         {
371             Contract.Assert(state != null, "'state' MUST NOT be NULL.");
372             WebSocketConnectionStream thisPtr = state as WebSocketConnectionStream;
373             Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
374
375             if (WebSocketBase.LoggingEnabled)
376             {
377                 Logging.Enter(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
378             }
379
380             try
381             {
382                 // Taking a lock to avoid a race condition between ConnectStream.CloseEx (called in OnCancel) and 
383                 // ServicePoint.CloseConnectionGroup (called in Close) which can result in a deadlock
384                 lock (thisPtr.m_CloseConnectStreamLock)
385                 {
386                     // similar code like in HttpWebResponse.Abort, but we don't need some of the validations
387                     // and we want to ensure that the TCP connection is reset
388                     thisPtr.m_ConnectStream.Connection.NetworkStream.InternalAbortSocket();
389                     ((ICloseEx)thisPtr.m_ConnectStream).CloseEx(CloseExState.Abort);
390                 }
391                 thisPtr.CancelWebSocketConnection();
392             }
393             catch { }
394             finally
395             {
396                 if (WebSocketBase.LoggingEnabled)
397                 {
398                     Logging.Exit(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
399                 }
400             }
401         }
402
403         private void CancelWebSocketConnection()
404         {
405             if (m_InOpaqueMode)
406             {
407                 WebSocketConnection webSocketConnection = (WebSocketConnection)base.BaseStream;
408                 s_OnCancelWebSocketConnection(webSocketConnection);
409             }
410         }
411
412         public void Abort()
413         {
414             OnCancel(this);
415         }
416
417         private void ConsumeConnectStreamBuffer(ConnectStream connectStream)
418         {
419             if (connectStream.Eof)
420             {
421                 return;
422             }
423
424             byte[] buffer = new byte[1024];
425             int count;
426             int offset = 0;
427             int size = buffer.Length;
428
429             while ((count = connectStream.FillFromBufferedData(buffer, ref offset, ref size)) > 0)
430             {
431                 if (WebSocketBase.LoggingEnabled)
432                 {
433                     Logging.Dump(Logging.WebSockets, this, "ConsumeConnectStreamBuffer", buffer, 0, count);
434                 }
435
436                 Append(buffer, 0, count);
437                 offset = 0;
438                 size = buffer.Length;
439             }
440         }
441
442         private static class Methods
443         {
444             public const string Close = "Close";
445             public const string CloseNetworkConnectionAsync = "CloseNetworkConnectionAsync";
446             public const string OnCancel = "OnCancel";
447             public const string ReadAsync = "ReadAsync";
448             public const string WriteAsync = "WriteAsync";
449             public const string MultipleWriteAsync = "MultipleWriteAsync";
450         }
451
452         private class WebSocketConnection : DelegatedStream, WebSocketBase.IWebSocketStream
453         {
454             private static readonly EventHandler<SocketAsyncEventArgs> s_OnReadCompleted =
455                 new EventHandler<SocketAsyncEventArgs>(OnReadCompleted);
456             private static readonly EventHandler<SocketAsyncEventArgs> s_OnWriteCompleted =
457                 new EventHandler<SocketAsyncEventArgs>(OnWriteCompleted);
458             private static readonly Func<IList<ArraySegment<byte>>, AsyncCallback, object, IAsyncResult> s_BeginMultipleWrite =
459                 new Func<IList<ArraySegment<byte>>, AsyncCallback, object, IAsyncResult>(BeginMultipleWrite);
460             private static readonly Action<IAsyncResult> s_EndMultipleWrite =
461                 new Action<IAsyncResult>(EndMultipleWrite);
462
463 #if DEBUG
464             private class OutstandingOperations
465             {
466                 internal int m_Reads;
467                 internal int m_Writes;
468             }
469
470             private readonly OutstandingOperations m_OutstandingOperations = new OutstandingOperations();
471 #endif //DEBUG
472
473             private readonly Connection m_InnerStream;
474             private readonly bool m_SupportsMultipleWrites;
475             private bool m_InOpaqueMode;
476             private WebSocketBase m_WebSocket;
477             private SocketAsyncEventArgs m_WriteEventArgs;
478             private SocketAsyncEventArgs m_ReadEventArgs;
479             private TaskCompletionSource<object> m_WriteTaskCompletionSource;
480             private TaskCompletionSource<int> m_ReadTaskCompletionSource;
481             private int m_CleanedUp;
482             private bool m_IgnoreReadError;
483
484             internal WebSocketConnection(Connection connection)
485                 : base(connection)
486             {
487                 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
488                 Contract.Assert(connection.NetworkStream != null, "'connection.NetworkStream' MUST NOT be NULL.");
489
490                 m_InnerStream = connection;
491                 m_InOpaqueMode = false;
492                 // NetworkStream.Multiplewrite is internal. So custom NetworkStream implementations might not support it.
493                 m_SupportsMultipleWrites = connection.NetworkStream.GetType().Assembly == s_NetworkStreamType.Assembly;
494             }
495
496             internal Socket InnerSocket
497             {
498                 get
499                 {
500                     return GetInnerSocket(false);
501                 }
502             }
503
504             public override bool CanSeek
505             {
506                 get
507                 {
508                     return false;
509                 }
510             }
511
512             public override bool CanRead
513             {
514                 get
515                 {
516                     return true;
517                 }
518             }
519
520             public override bool CanWrite
521             {
522                 get
523                 {
524                     return true;
525                 }
526             }
527
528             public bool SupportsMultipleWrite
529             {
530                 get
531                 {
532                     return m_SupportsMultipleWrites;
533                 }
534             }
535
536             public Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
537             {
538                 throw new NotImplementedException();
539             }
540
541             public override void Close()
542             {
543                 if (WebSocketBase.LoggingEnabled)
544                 {
545                     Logging.Enter(Logging.WebSockets, this, Methods.Close, string.Empty);
546                 }
547
548                 try
549                 {
550                     base.Close();
551
552                     if (Interlocked.Increment(ref m_CleanedUp) == 1)
553                     {
554                         if (m_WriteEventArgs != null)
555                         {
556                             m_WriteEventArgs.Completed -= s_OnWriteCompleted;
557                             m_WriteEventArgs.Dispose();
558                         }
559
560                         if (m_ReadEventArgs != null)
561                         {
562                             m_ReadEventArgs.Completed -= s_OnReadCompleted;
563                             m_ReadEventArgs.Dispose();
564                         }
565                     }
566                 }
567                 finally
568                 {
569                     if (WebSocketBase.LoggingEnabled)
570                     {
571                         Logging.Exit(Logging.WebSockets, this, Methods.Close, string.Empty);
572                     }
573                 }
574             }
575
576             internal Socket GetInnerSocket(bool skipStateCheck)
577             {
578                 Socket returnValue;
579                 if (!skipStateCheck)
580                 {
581                     m_WebSocket.ThrowIfClosedOrAborted();
582                 }
583                 try
584                 {
585                     Contract.Assert(m_InnerStream.NetworkStream != null, "'m_InnerStream.NetworkStream' MUST NOT be NULL.");
586                     returnValue = m_InnerStream.NetworkStream.InternalSocket;
587                 }
588                 catch (ObjectDisposedException)
589                 {
590                     m_WebSocket.ThrowIfClosedOrAborted();
591                     throw;
592                 }
593
594                 return returnValue;
595             }
596
597             private static IAsyncResult BeginMultipleWrite(IList<ArraySegment<byte>> sendBuffers, AsyncCallback callback, object asyncState)
598             {
599                 Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
600                 Contract.Assert(asyncState != null, "'asyncState' MUST NOT be NULL.");
601                 WebSocketConnection connection = asyncState as WebSocketConnection;
602                 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
603
604                 BufferOffsetSize[] buffers = new BufferOffsetSize[sendBuffers.Count];
605                 
606                 for (int index = 0; index < sendBuffers.Count; index++)
607                 {
608                     ArraySegment<byte> sendBuffer = sendBuffers[index];
609                     buffers[index] = new BufferOffsetSize(sendBuffer.Array, sendBuffer.Offset, sendBuffer.Count, false);
610                 }
611
612                 WebSocketHelpers.ThrowIfConnectionAborted(connection.m_InnerStream, false);
613                 return connection.m_InnerStream.NetworkStream.BeginMultipleWrite(buffers, callback, asyncState);
614             }
615
616             private static void EndMultipleWrite(IAsyncResult asyncResult)
617             {
618                 Contract.Assert(asyncResult != null, "'asyncResult' MUST NOT be NULL.");
619                 Contract.Assert(asyncResult.AsyncState != null, "'asyncResult.AsyncState' MUST NOT be NULL.");
620                 WebSocketConnection connection = asyncResult.AsyncState as WebSocketConnection;
621                 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
622
623                 WebSocketHelpers.ThrowIfConnectionAborted(connection.m_InnerStream, false);
624                 connection.m_InnerStream.NetworkStream.EndMultipleWrite(asyncResult);
625             }
626
627             public Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers, 
628                 CancellationToken cancellationToken)
629             {
630                 Contract.Assert(this.SupportsMultipleWrite, "This method MUST NOT be used for custom NetworkStream implementations.");
631
632                 if (!m_InOpaqueMode)
633                 {
634                     // We can't use fast path over SSL
635                     return Task.Factory.FromAsync<IList<ArraySegment<byte>>>(s_BeginMultipleWrite, s_EndMultipleWrite, 
636                         sendBuffers, this);
637                 }
638
639                 if (WebSocketBase.LoggingEnabled)
640                 {
641                     Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
642                 }
643
644                 bool completedAsynchronously = false;
645                 try
646                 {
647                     cancellationToken.ThrowIfCancellationRequested();
648 #if DEBUG
649                     // When using fast path only one outstanding read is permitted. By switching into opaque mode
650                     // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
651                     // caller takes responsibility for enforcing this constraint.
652                     Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
653                         "Only one outstanding write allowed at any given time.");
654 #endif
655                     WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, false);
656                     m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
657                     m_WriteEventArgs.SetBuffer(null, 0, 0);
658                     m_WriteEventArgs.BufferList = sendBuffers;
659                     completedAsynchronously = InnerSocket.SendAsync(m_WriteEventArgs);
660                     if (!completedAsynchronously)
661                     {
662                         if (m_WriteEventArgs.SocketError != SocketError.Success)
663                         {
664                             throw new SocketException(m_WriteEventArgs.SocketError);
665                         }
666
667                         return Task.CompletedTask;
668                     }
669
670                     return m_WriteTaskCompletionSource.Task;
671                 }
672                 finally
673                 {
674                     if (WebSocketBase.LoggingEnabled)
675                     {
676                         Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsync, completedAsynchronously);
677                     }
678                 }
679             }
680
681             public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
682             {
683                 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
684
685                 if (!m_InOpaqueMode)
686                 {
687                     return base.WriteAsync(buffer, offset, count, cancellationToken);
688                 }
689
690                 if (WebSocketBase.LoggingEnabled)
691                 {
692                     Logging.Enter(Logging.WebSockets, this, Methods.WriteAsync,
693                         WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
694                 }
695
696                 bool completedAsynchronously = false;
697                 try
698                 {
699                     cancellationToken.ThrowIfCancellationRequested();
700 #if DEBUG
701                     // When using fast path only one outstanding read is permitted. By switching into opaque mode
702                     // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
703                     // caller takes responsibility for enforcing this constraint.
704                     Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
705                         "Only one outstanding write allowed at any given time.");
706 #endif
707                     WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, false);
708                     m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
709                     m_WriteEventArgs.BufferList = null;
710                     m_WriteEventArgs.SetBuffer(buffer, offset, count);
711                     completedAsynchronously = InnerSocket.SendAsync(m_WriteEventArgs);
712                     if (!completedAsynchronously)
713                     {
714                         if (m_WriteEventArgs.SocketError != SocketError.Success)
715                         {
716                             throw new SocketException(m_WriteEventArgs.SocketError);
717                         }
718
719                         return Task.CompletedTask;
720                     }
721
722                     return m_WriteTaskCompletionSource.Task;
723                 }
724                 finally
725                 {
726                     if (WebSocketBase.LoggingEnabled)
727                     {
728                         Logging.Exit(Logging.WebSockets, this, Methods.WriteAsync, completedAsynchronously);
729                     }
730                 }
731             }
732
733             public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
734             {
735                 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
736
737                 if (!m_InOpaqueMode)
738                 {
739                     return base.ReadAsync(buffer, offset, count, cancellationToken);
740                 }
741
742                 return ReadAsyncCore(buffer, offset, count, cancellationToken, false);
743             }
744
745             internal Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken, 
746                 bool ignoreReadError)
747             {
748                 if (WebSocketBase.LoggingEnabled)
749                 {
750                     Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncCore,
751                         WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
752                 }
753
754                 bool completedAsynchronously = false;
755                 m_IgnoreReadError = ignoreReadError;
756                 try
757                 {
758                     cancellationToken.ThrowIfCancellationRequested();
759 #if DEBUG
760                     // When using fast path only one outstanding read is permitted. By switching into opaque mode
761                     // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
762                     // caller takes responsibility for enforcing this constraint.
763                     Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Reads) == 1,
764                         "Only one outstanding read allowed at any given time.");
765 #endif
766                     WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, true);
767                     m_ReadTaskCompletionSource = new TaskCompletionSource<int>();
768                     Contract.Assert(m_ReadEventArgs != null, "'m_ReadEventArgs' MUST NOT be NULL.");
769                     m_ReadEventArgs.SetBuffer(buffer, offset, count);
770                     Socket innerSocket;
771                     if (ignoreReadError)
772                     {
773                         // The State of the WebSocket instance is already Closed at this point
774                         // Skipping call to WebSocketBase.ThrowIfClosedOrAborted
775                         innerSocket = GetInnerSocket(true);
776                     }
777                     else
778                     {
779                         innerSocket = InnerSocket;
780                     }
781                     completedAsynchronously = innerSocket.ReceiveAsync(m_ReadEventArgs);
782                     if (!completedAsynchronously)
783                     {
784                         if (m_ReadEventArgs.SocketError != SocketError.Success)
785                         {
786                             if (!m_IgnoreReadError)
787                             {
788                                 throw new SocketException(m_ReadEventArgs.SocketError);
789                             }
790                             else
791                             {
792                                 return Task.FromResult<int>(0);
793                             }
794                         }
795
796                         return Task.FromResult<int>(m_ReadEventArgs.BytesTransferred);
797                     }
798
799                     return m_ReadTaskCompletionSource.Task;
800                 }
801                 finally
802                 {
803                     if (WebSocketBase.LoggingEnabled)
804                     {
805                         Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncCore, completedAsynchronously);
806                     }
807                 }
808             }
809
810             public override Task FlushAsync(CancellationToken cancellationToken)
811             {
812                 if (!m_InOpaqueMode)
813                 {
814                     return base.FlushAsync(cancellationToken);
815                 }
816
817                 cancellationToken.ThrowIfCancellationRequested();
818                 return Task.CompletedTask;
819             }
820
821             public void Abort()
822             {
823                 // No op - the abort is handled by the WebSocketConnectionStream
824             }
825
826             // According to my tests even when aborting the underlying Socket the completionEvent for 
827             // SocketAsyncEventArgs is not always fired, which can result in not cancelling the underlying
828             // IO. Cancelling the TaskCompletionSources below is safe, because CompletionSource.Tryxxx 
829             // is handling the race condition (whoever is completing the CompletionSource first wins.)
830             internal static void OnCancel(object state)
831             {
832                 Contract.Assert(state != null, "'state' MUST NOT be NULL.");
833                 WebSocketConnection thisPtr = state as WebSocketConnection;
834                 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
835
836                 if (WebSocketBase.LoggingEnabled)
837                 {
838                     Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnCancel, string.Empty);
839                 }
840
841                 try
842                 {
843                     TaskCompletionSource<int> readTaskCompletionSourceSnapshot = thisPtr.m_ReadTaskCompletionSource;
844
845                     if (readTaskCompletionSourceSnapshot != null)
846                     {
847                         readTaskCompletionSourceSnapshot.TrySetCanceled();
848                     }
849
850                     TaskCompletionSource<object> writeTaskCompletionSourceSnapshot = thisPtr.m_WriteTaskCompletionSource;
851
852                     if (writeTaskCompletionSourceSnapshot != null)
853                     {
854                         writeTaskCompletionSourceSnapshot.TrySetCanceled();
855                     }
856                 }
857                 finally
858                 {
859                     if (WebSocketBase.LoggingEnabled)
860                     {
861                         Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnCancel, string.Empty);
862                     }
863                 }
864             }
865
866             public void SwitchToOpaqueMode(WebSocketBase webSocket)
867             {
868                 Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
869                 Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
870                 m_WebSocket = webSocket;
871                 m_InOpaqueMode = true;
872                 m_ReadEventArgs = new SocketAsyncEventArgs();
873                 m_ReadEventArgs.UserToken = this;
874                 m_ReadEventArgs.Completed += s_OnReadCompleted;
875                 m_WriteEventArgs = new SocketAsyncEventArgs();
876                 m_WriteEventArgs.UserToken = this;
877                 m_WriteEventArgs.Completed += s_OnWriteCompleted;
878             }
879
880             private static string GetIOCompletionTraceMsg(SocketAsyncEventArgs eventArgs)
881             {
882                 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
883                 return string.Format(CultureInfo.InvariantCulture,
884                     "LastOperation: {0}, SocketError: {1}",
885                     eventArgs.LastOperation,
886                     eventArgs.SocketError);
887             }
888
889             private static void OnWriteCompleted(object sender, SocketAsyncEventArgs eventArgs)
890             {
891                 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
892                 WebSocketConnection thisPtr = eventArgs.UserToken as WebSocketConnection;
893                 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
894
895 #if DEBUG
896                 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Writes) >= 0,
897                     "'thisPtr.m_OutstandingOperations.m_Writes' MUST NOT be negative.");
898 #endif
899
900                 if (WebSocketBase.LoggingEnabled)
901                 {
902                     Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, 
903                         GetIOCompletionTraceMsg(eventArgs));
904                 }
905
906                 if (eventArgs.SocketError != SocketError.Success)
907                 {
908                     thisPtr.m_WriteTaskCompletionSource.TrySetException(new SocketException(eventArgs.SocketError));
909                 }
910                 else
911                 {
912                     thisPtr.m_WriteTaskCompletionSource.TrySetResult(null);
913                 }
914
915                 if (WebSocketBase.LoggingEnabled)
916                 {
917                     Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
918                 }
919             }
920
921             private static void OnReadCompleted(object sender, SocketAsyncEventArgs eventArgs)
922             {
923                 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
924                 WebSocketConnection thisPtr = eventArgs.UserToken as WebSocketConnection;
925                 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
926 #if DEBUG
927                 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Reads) >= 0,
928                     "'thisPtr.m_OutstandingOperations.m_Reads' MUST NOT be negative.");
929 #endif
930
931                 if (WebSocketBase.LoggingEnabled)
932                 {
933                     Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnReadCompleted,
934                         GetIOCompletionTraceMsg(eventArgs));
935                 }
936
937                 if (eventArgs.SocketError != SocketError.Success)
938                 {
939                     if (!thisPtr.m_IgnoreReadError)
940                     {
941                         thisPtr.m_ReadTaskCompletionSource.TrySetException(new SocketException(eventArgs.SocketError));
942                     }
943                     else
944                     {
945                         thisPtr.m_ReadTaskCompletionSource.TrySetResult(0);
946                     }
947                 }
948                 else
949                 {
950                     thisPtr.m_ReadTaskCompletionSource.TrySetResult(eventArgs.BytesTransferred);
951                 }
952
953                 if (WebSocketBase.LoggingEnabled)
954                 {
955                     Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
956                 }
957             }
958
959             private static class Methods
960             {
961                 public const string Close = "Close";
962                 public const string OnCancel = "OnCancel";
963                 public const string OnReadCompleted = "OnReadCompleted";
964                 public const string OnWriteCompleted = "OnWriteCompleted";
965                 public const string ReadAsyncCore = "ReadAsyncCore";
966                 public const string WriteAsync = "WriteAsync";
967                 public const string MultipleWriteAsync = "MultipleWriteAsync";
968             }
969         }
970     }
971 }