1 //------------------------------------------------------------------------------
2 // <copyright file="WebSocketConnectionStream.cs" company="Microsoft">
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 //------------------------------------------------------------------------------
7 namespace System.Net.WebSockets
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Diagnostics;
12 using System.Diagnostics.Contracts;
13 using System.Globalization;
15 using System.Net.Sockets;
16 using System.Threading;
17 using System.Threading.Tasks;
19 internal class WebSocketConnectionStream : BufferedReadStream, WebSocketBase.IWebSocketStream
21 private static readonly Func<Exception, bool> s_CanHandleException = new Func<Exception, bool>(CanHandleException);
22 private static readonly Action<object> s_OnCancel = new Action<object>(OnCancel);
23 private static readonly Action<object> s_OnCancelWebSocketConnection = new Action<object>(WebSocketConnection.OnCancel);
24 private static readonly Type s_NetworkStreamType = typeof(NetworkStream);
25 private readonly ConnectStream m_ConnectStream;
26 private readonly string m_ConnectionGroupName;
27 private readonly bool m_IsFastPathAllowed;
28 private readonly object m_CloseConnectStreamLock;
29 private bool m_InOpaqueMode;
30 private WebSocketConnection m_WebSocketConnection;
32 public WebSocketConnectionStream(ConnectStream connectStream, string connectionGroupName)
33 : base(new WebSocketConnection(connectStream.Connection), false)
35 Contract.Assert(connectStream != null,
36 "'connectStream' MUST NOT be NULL.");
37 Contract.Assert(connectStream.Connection != null,
38 "'connectStream.Conection' MUST NOT be NULL.");
39 Contract.Assert(connectStream.Connection.NetworkStream != null,
40 "'connectStream.Conection.NetworkStream' MUST NOT be NULL.");
41 Contract.Assert(!string.IsNullOrEmpty(connectionGroupName),
42 "connectionGroupName should not be null or empty.");
44 m_ConnectStream = connectStream;
45 m_ConnectionGroupName = connectionGroupName;
46 m_CloseConnectStreamLock = new object();
47 // Make sure we don't short circuit for TlsStream or custom NetworkStream implementations
48 m_IsFastPathAllowed = m_ConnectStream.Connection.NetworkStream.GetType() == s_NetworkStreamType;
50 if (WebSocketBase.LoggingEnabled)
52 Logging.Associate(Logging.WebSockets, this, m_ConnectStream.Connection);
55 ConsumeConnectStreamBuffer(connectStream);
58 public override bool CanSeek
66 public override bool CanRead
74 public override bool CanWrite
82 public bool SupportsMultipleWrite
86 return ((WebSocketConnection)this.BaseStream).SupportsMultipleWrite;
90 public async Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
92 // need to yield here to make sure that we don't get any exception synchronously
94 if (WebSocketBase.LoggingEnabled)
96 Logging.Enter(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
99 CancellationTokenSource reasonableTimeoutCancellationTokenSource = null;
100 CancellationTokenSource linkedCancellationTokenSource = null;
101 CancellationToken linkedCancellationToken = CancellationToken.None;
103 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
108 reasonableTimeoutCancellationTokenSource =
109 new CancellationTokenSource(WebSocketHelpers.ClientTcpCloseTimeout);
110 linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
111 reasonableTimeoutCancellationTokenSource.Token,
113 linkedCancellationToken = linkedCancellationTokenSource.Token;
114 cancellationTokenRegistration = linkedCancellationToken.Register(s_OnCancel, this, false);
116 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, true);
117 byte[] buffer = new byte[1];
118 if (m_WebSocketConnection != null && m_InOpaqueMode)
120 bytesRead = await m_WebSocketConnection.ReadAsyncCore(buffer, 0, 1, linkedCancellationToken, true).SuppressContextFlow<int>();
124 bytesRead = await base.ReadAsync(buffer, 0, 1, linkedCancellationToken).SuppressContextFlow<int>();
129 Contract.Assert(false, "'bytesRead' MUST be '0' at this point. Instead more payload was received ('" + buffer[0].ToString() + "')");
131 if (WebSocketBase.LoggingEnabled)
133 Logging.Dump(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, buffer, 0, bytesRead);
136 throw new WebSocketException(WebSocketError.NotAWebSocket);
139 catch (Exception error)
141 if (!s_CanHandleException(error))
146 // throw OperationCancelledException when canceled by the caller
147 // ignore cancellation due to the timeout
148 cancellationToken.ThrowIfCancellationRequested();
152 cancellationTokenRegistration.Dispose();
153 if (linkedCancellationTokenSource != null)
155 linkedCancellationTokenSource.Dispose();
158 if (reasonableTimeoutCancellationTokenSource != null)
160 reasonableTimeoutCancellationTokenSource.Dispose();
163 if (WebSocketBase.LoggingEnabled)
165 Logging.Exit(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, bytesRead);
170 public override void Close()
172 if (WebSocketBase.LoggingEnabled)
174 Logging.Enter(Logging.WebSockets, this, Methods.Close, string.Empty);
179 // Taking a lock to avoid a race condition between ConnectStream.CloseEx (called in OnCancel) and
180 // ServicePoint.CloseConnectionGroup (called in Close) which can result in a deadlock
181 lock (m_CloseConnectStreamLock)
183 Contract.Assert(this.m_ConnectStream.Connection.ServicePoint != null, "connection.ServicePoint should not be null.");
184 this.m_ConnectStream.Connection.ServicePoint.CloseConnectionGroup(this.m_ConnectionGroupName);
190 if (WebSocketBase.LoggingEnabled)
192 Logging.Exit(Logging.WebSockets, this, Methods.Close, string.Empty);
197 public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
199 if (WebSocketBase.LoggingEnabled)
201 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsync,
202 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
205 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
210 if (cancellationToken.CanBeCanceled)
212 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
215 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, true);
216 bytesRead = await base.ReadAsync(buffer, offset, count, cancellationToken).SuppressContextFlow<int>();
218 if (WebSocketBase.LoggingEnabled)
220 Logging.Dump(Logging.WebSockets, this, Methods.ReadAsync, buffer, offset, bytesRead);
223 catch (Exception error)
225 if (s_CanHandleException(error))
227 cancellationToken.ThrowIfCancellationRequested();
234 cancellationTokenRegistration.Dispose();
236 if (WebSocketBase.LoggingEnabled)
238 Logging.Exit(Logging.WebSockets, this, Methods.ReadAsync, bytesRead);
245 public async override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
247 if (WebSocketBase.LoggingEnabled)
249 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsync,
250 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
252 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
256 if (cancellationToken.CanBeCanceled)
258 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
261 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, false);
262 await base.WriteAsync(buffer, offset, count, cancellationToken).SuppressContextFlow();
264 if (WebSocketBase.LoggingEnabled)
266 Logging.Dump(Logging.WebSockets, this, Methods.WriteAsync, buffer, offset, count);
269 catch (Exception error)
271 if (s_CanHandleException(error))
273 cancellationToken.ThrowIfCancellationRequested();
280 cancellationTokenRegistration.Dispose();
282 if (WebSocketBase.LoggingEnabled)
284 Logging.Exit(Logging.WebSockets, this, Methods.WriteAsync, string.Empty);
289 public void SwitchToOpaqueMode(WebSocketBase webSocket)
291 Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
292 Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
296 throw new InvalidOperationException();
299 m_WebSocketConnection = BaseStream as WebSocketConnection;
301 if (m_WebSocketConnection != null && m_IsFastPathAllowed)
303 if (WebSocketBase.LoggingEnabled)
305 Logging.Associate(Logging.WebSockets, this, m_WebSocketConnection);
308 m_WebSocketConnection.SwitchToOpaqueMode(webSocket);
309 m_InOpaqueMode = true;
313 public async Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
315 Contract.Assert(this.SupportsMultipleWrite, "This method MUST NOT be used for custom NetworkStream implementations.");
317 if (WebSocketBase.LoggingEnabled)
319 Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
321 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
325 if (cancellationToken.CanBeCanceled)
327 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
330 WebSocketHelpers.ThrowIfConnectionAborted(m_ConnectStream.Connection, false);
331 await ((WebSocketBase.IWebSocketStream)base.BaseStream).MultipleWriteAsync(sendBuffers, cancellationToken).SuppressContextFlow();
333 if (WebSocketBase.LoggingEnabled)
335 foreach(ArraySegment<byte> buffer in sendBuffers)
337 Logging.Dump(Logging.WebSockets, this, Methods.MultipleWriteAsync, buffer.Array, buffer.Offset, buffer.Count);
341 catch (Exception error)
343 if (s_CanHandleException(error))
345 cancellationToken.ThrowIfCancellationRequested();
352 cancellationTokenRegistration.Dispose();
354 if (WebSocketBase.LoggingEnabled)
356 Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
361 private static bool CanHandleException(Exception error)
363 return error is SocketException ||
364 error is ObjectDisposedException ||
365 error is WebException ||
366 error is IOException;
369 private static void OnCancel(object state)
371 Contract.Assert(state != null, "'state' MUST NOT be NULL.");
372 WebSocketConnectionStream thisPtr = state as WebSocketConnectionStream;
373 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
375 if (WebSocketBase.LoggingEnabled)
377 Logging.Enter(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
382 // Taking a lock to avoid a race condition between ConnectStream.CloseEx (called in OnCancel) and
383 // ServicePoint.CloseConnectionGroup (called in Close) which can result in a deadlock
384 lock (thisPtr.m_CloseConnectStreamLock)
386 // similar code like in HttpWebResponse.Abort, but we don't need some of the validations
387 // and we want to ensure that the TCP connection is reset
388 thisPtr.m_ConnectStream.Connection.NetworkStream.InternalAbortSocket();
389 ((ICloseEx)thisPtr.m_ConnectStream).CloseEx(CloseExState.Abort);
391 thisPtr.CancelWebSocketConnection();
396 if (WebSocketBase.LoggingEnabled)
398 Logging.Exit(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
403 private void CancelWebSocketConnection()
407 WebSocketConnection webSocketConnection = (WebSocketConnection)base.BaseStream;
408 s_OnCancelWebSocketConnection(webSocketConnection);
417 private void ConsumeConnectStreamBuffer(ConnectStream connectStream)
419 if (connectStream.Eof)
424 byte[] buffer = new byte[1024];
427 int size = buffer.Length;
429 while ((count = connectStream.FillFromBufferedData(buffer, ref offset, ref size)) > 0)
431 if (WebSocketBase.LoggingEnabled)
433 Logging.Dump(Logging.WebSockets, this, "ConsumeConnectStreamBuffer", buffer, 0, count);
436 Append(buffer, 0, count);
438 size = buffer.Length;
442 private static class Methods
444 public const string Close = "Close";
445 public const string CloseNetworkConnectionAsync = "CloseNetworkConnectionAsync";
446 public const string OnCancel = "OnCancel";
447 public const string ReadAsync = "ReadAsync";
448 public const string WriteAsync = "WriteAsync";
449 public const string MultipleWriteAsync = "MultipleWriteAsync";
452 private class WebSocketConnection : DelegatedStream, WebSocketBase.IWebSocketStream
454 private static readonly EventHandler<SocketAsyncEventArgs> s_OnReadCompleted =
455 new EventHandler<SocketAsyncEventArgs>(OnReadCompleted);
456 private static readonly EventHandler<SocketAsyncEventArgs> s_OnWriteCompleted =
457 new EventHandler<SocketAsyncEventArgs>(OnWriteCompleted);
458 private static readonly Func<IList<ArraySegment<byte>>, AsyncCallback, object, IAsyncResult> s_BeginMultipleWrite =
459 new Func<IList<ArraySegment<byte>>, AsyncCallback, object, IAsyncResult>(BeginMultipleWrite);
460 private static readonly Action<IAsyncResult> s_EndMultipleWrite =
461 new Action<IAsyncResult>(EndMultipleWrite);
464 private class OutstandingOperations
466 internal int m_Reads;
467 internal int m_Writes;
470 private readonly OutstandingOperations m_OutstandingOperations = new OutstandingOperations();
473 private readonly Connection m_InnerStream;
474 private readonly bool m_SupportsMultipleWrites;
475 private bool m_InOpaqueMode;
476 private WebSocketBase m_WebSocket;
477 private SocketAsyncEventArgs m_WriteEventArgs;
478 private SocketAsyncEventArgs m_ReadEventArgs;
479 private TaskCompletionSource<object> m_WriteTaskCompletionSource;
480 private TaskCompletionSource<int> m_ReadTaskCompletionSource;
481 private int m_CleanedUp;
482 private bool m_IgnoreReadError;
484 internal WebSocketConnection(Connection connection)
487 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
488 Contract.Assert(connection.NetworkStream != null, "'connection.NetworkStream' MUST NOT be NULL.");
490 m_InnerStream = connection;
491 m_InOpaqueMode = false;
492 // NetworkStream.Multiplewrite is internal. So custom NetworkStream implementations might not support it.
493 m_SupportsMultipleWrites = connection.NetworkStream.GetType().Assembly == s_NetworkStreamType.Assembly;
496 internal Socket InnerSocket
500 return GetInnerSocket(false);
504 public override bool CanSeek
512 public override bool CanRead
520 public override bool CanWrite
528 public bool SupportsMultipleWrite
532 return m_SupportsMultipleWrites;
536 public Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
538 throw new NotImplementedException();
541 public override void Close()
543 if (WebSocketBase.LoggingEnabled)
545 Logging.Enter(Logging.WebSockets, this, Methods.Close, string.Empty);
552 if (Interlocked.Increment(ref m_CleanedUp) == 1)
554 if (m_WriteEventArgs != null)
556 m_WriteEventArgs.Completed -= s_OnWriteCompleted;
557 m_WriteEventArgs.Dispose();
560 if (m_ReadEventArgs != null)
562 m_ReadEventArgs.Completed -= s_OnReadCompleted;
563 m_ReadEventArgs.Dispose();
569 if (WebSocketBase.LoggingEnabled)
571 Logging.Exit(Logging.WebSockets, this, Methods.Close, string.Empty);
576 internal Socket GetInnerSocket(bool skipStateCheck)
581 m_WebSocket.ThrowIfClosedOrAborted();
585 Contract.Assert(m_InnerStream.NetworkStream != null, "'m_InnerStream.NetworkStream' MUST NOT be NULL.");
586 returnValue = m_InnerStream.NetworkStream.InternalSocket;
588 catch (ObjectDisposedException)
590 m_WebSocket.ThrowIfClosedOrAborted();
597 private static IAsyncResult BeginMultipleWrite(IList<ArraySegment<byte>> sendBuffers, AsyncCallback callback, object asyncState)
599 Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
600 Contract.Assert(asyncState != null, "'asyncState' MUST NOT be NULL.");
601 WebSocketConnection connection = asyncState as WebSocketConnection;
602 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
604 BufferOffsetSize[] buffers = new BufferOffsetSize[sendBuffers.Count];
606 for (int index = 0; index < sendBuffers.Count; index++)
608 ArraySegment<byte> sendBuffer = sendBuffers[index];
609 buffers[index] = new BufferOffsetSize(sendBuffer.Array, sendBuffer.Offset, sendBuffer.Count, false);
612 WebSocketHelpers.ThrowIfConnectionAborted(connection.m_InnerStream, false);
613 return connection.m_InnerStream.NetworkStream.BeginMultipleWrite(buffers, callback, asyncState);
616 private static void EndMultipleWrite(IAsyncResult asyncResult)
618 Contract.Assert(asyncResult != null, "'asyncResult' MUST NOT be NULL.");
619 Contract.Assert(asyncResult.AsyncState != null, "'asyncResult.AsyncState' MUST NOT be NULL.");
620 WebSocketConnection connection = asyncResult.AsyncState as WebSocketConnection;
621 Contract.Assert(connection != null, "'connection' MUST NOT be NULL.");
623 WebSocketHelpers.ThrowIfConnectionAborted(connection.m_InnerStream, false);
624 connection.m_InnerStream.NetworkStream.EndMultipleWrite(asyncResult);
627 public Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers,
628 CancellationToken cancellationToken)
630 Contract.Assert(this.SupportsMultipleWrite, "This method MUST NOT be used for custom NetworkStream implementations.");
634 // We can't use fast path over SSL
635 return Task.Factory.FromAsync<IList<ArraySegment<byte>>>(s_BeginMultipleWrite, s_EndMultipleWrite,
639 if (WebSocketBase.LoggingEnabled)
641 Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsync, string.Empty);
644 bool completedAsynchronously = false;
647 cancellationToken.ThrowIfCancellationRequested();
649 // When using fast path only one outstanding read is permitted. By switching into opaque mode
650 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
651 // caller takes responsibility for enforcing this constraint.
652 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
653 "Only one outstanding write allowed at any given time.");
655 WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, false);
656 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
657 m_WriteEventArgs.SetBuffer(null, 0, 0);
658 m_WriteEventArgs.BufferList = sendBuffers;
659 completedAsynchronously = InnerSocket.SendAsync(m_WriteEventArgs);
660 if (!completedAsynchronously)
662 if (m_WriteEventArgs.SocketError != SocketError.Success)
664 throw new SocketException(m_WriteEventArgs.SocketError);
667 return Task.CompletedTask;
670 return m_WriteTaskCompletionSource.Task;
674 if (WebSocketBase.LoggingEnabled)
676 Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsync, completedAsynchronously);
681 public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
683 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
687 return base.WriteAsync(buffer, offset, count, cancellationToken);
690 if (WebSocketBase.LoggingEnabled)
692 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsync,
693 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
696 bool completedAsynchronously = false;
699 cancellationToken.ThrowIfCancellationRequested();
701 // When using fast path only one outstanding read is permitted. By switching into opaque mode
702 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
703 // caller takes responsibility for enforcing this constraint.
704 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
705 "Only one outstanding write allowed at any given time.");
707 WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, false);
708 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
709 m_WriteEventArgs.BufferList = null;
710 m_WriteEventArgs.SetBuffer(buffer, offset, count);
711 completedAsynchronously = InnerSocket.SendAsync(m_WriteEventArgs);
712 if (!completedAsynchronously)
714 if (m_WriteEventArgs.SocketError != SocketError.Success)
716 throw new SocketException(m_WriteEventArgs.SocketError);
719 return Task.CompletedTask;
722 return m_WriteTaskCompletionSource.Task;
726 if (WebSocketBase.LoggingEnabled)
728 Logging.Exit(Logging.WebSockets, this, Methods.WriteAsync, completedAsynchronously);
733 public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
735 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
739 return base.ReadAsync(buffer, offset, count, cancellationToken);
742 return ReadAsyncCore(buffer, offset, count, cancellationToken, false);
745 internal Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken,
746 bool ignoreReadError)
748 if (WebSocketBase.LoggingEnabled)
750 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncCore,
751 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
754 bool completedAsynchronously = false;
755 m_IgnoreReadError = ignoreReadError;
758 cancellationToken.ThrowIfCancellationRequested();
760 // When using fast path only one outstanding read is permitted. By switching into opaque mode
761 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
762 // caller takes responsibility for enforcing this constraint.
763 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Reads) == 1,
764 "Only one outstanding read allowed at any given time.");
766 WebSocketHelpers.ThrowIfConnectionAborted(m_InnerStream, true);
767 m_ReadTaskCompletionSource = new TaskCompletionSource<int>();
768 Contract.Assert(m_ReadEventArgs != null, "'m_ReadEventArgs' MUST NOT be NULL.");
769 m_ReadEventArgs.SetBuffer(buffer, offset, count);
773 // The State of the WebSocket instance is already Closed at this point
774 // Skipping call to WebSocketBase.ThrowIfClosedOrAborted
775 innerSocket = GetInnerSocket(true);
779 innerSocket = InnerSocket;
781 completedAsynchronously = innerSocket.ReceiveAsync(m_ReadEventArgs);
782 if (!completedAsynchronously)
784 if (m_ReadEventArgs.SocketError != SocketError.Success)
786 if (!m_IgnoreReadError)
788 throw new SocketException(m_ReadEventArgs.SocketError);
792 return Task.FromResult<int>(0);
796 return Task.FromResult<int>(m_ReadEventArgs.BytesTransferred);
799 return m_ReadTaskCompletionSource.Task;
803 if (WebSocketBase.LoggingEnabled)
805 Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncCore, completedAsynchronously);
810 public override Task FlushAsync(CancellationToken cancellationToken)
814 return base.FlushAsync(cancellationToken);
817 cancellationToken.ThrowIfCancellationRequested();
818 return Task.CompletedTask;
823 // No op - the abort is handled by the WebSocketConnectionStream
826 // According to my tests even when aborting the underlying Socket the completionEvent for
827 // SocketAsyncEventArgs is not always fired, which can result in not cancelling the underlying
828 // IO. Cancelling the TaskCompletionSources below is safe, because CompletionSource.Tryxxx
829 // is handling the race condition (whoever is completing the CompletionSource first wins.)
830 internal static void OnCancel(object state)
832 Contract.Assert(state != null, "'state' MUST NOT be NULL.");
833 WebSocketConnection thisPtr = state as WebSocketConnection;
834 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
836 if (WebSocketBase.LoggingEnabled)
838 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnCancel, string.Empty);
843 TaskCompletionSource<int> readTaskCompletionSourceSnapshot = thisPtr.m_ReadTaskCompletionSource;
845 if (readTaskCompletionSourceSnapshot != null)
847 readTaskCompletionSourceSnapshot.TrySetCanceled();
850 TaskCompletionSource<object> writeTaskCompletionSourceSnapshot = thisPtr.m_WriteTaskCompletionSource;
852 if (writeTaskCompletionSourceSnapshot != null)
854 writeTaskCompletionSourceSnapshot.TrySetCanceled();
859 if (WebSocketBase.LoggingEnabled)
861 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnCancel, string.Empty);
866 public void SwitchToOpaqueMode(WebSocketBase webSocket)
868 Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
869 Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
870 m_WebSocket = webSocket;
871 m_InOpaqueMode = true;
872 m_ReadEventArgs = new SocketAsyncEventArgs();
873 m_ReadEventArgs.UserToken = this;
874 m_ReadEventArgs.Completed += s_OnReadCompleted;
875 m_WriteEventArgs = new SocketAsyncEventArgs();
876 m_WriteEventArgs.UserToken = this;
877 m_WriteEventArgs.Completed += s_OnWriteCompleted;
880 private static string GetIOCompletionTraceMsg(SocketAsyncEventArgs eventArgs)
882 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
883 return string.Format(CultureInfo.InvariantCulture,
884 "LastOperation: {0}, SocketError: {1}",
885 eventArgs.LastOperation,
886 eventArgs.SocketError);
889 private static void OnWriteCompleted(object sender, SocketAsyncEventArgs eventArgs)
891 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
892 WebSocketConnection thisPtr = eventArgs.UserToken as WebSocketConnection;
893 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
896 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Writes) >= 0,
897 "'thisPtr.m_OutstandingOperations.m_Writes' MUST NOT be negative.");
900 if (WebSocketBase.LoggingEnabled)
902 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted,
903 GetIOCompletionTraceMsg(eventArgs));
906 if (eventArgs.SocketError != SocketError.Success)
908 thisPtr.m_WriteTaskCompletionSource.TrySetException(new SocketException(eventArgs.SocketError));
912 thisPtr.m_WriteTaskCompletionSource.TrySetResult(null);
915 if (WebSocketBase.LoggingEnabled)
917 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
921 private static void OnReadCompleted(object sender, SocketAsyncEventArgs eventArgs)
923 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
924 WebSocketConnection thisPtr = eventArgs.UserToken as WebSocketConnection;
925 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
927 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Reads) >= 0,
928 "'thisPtr.m_OutstandingOperations.m_Reads' MUST NOT be negative.");
931 if (WebSocketBase.LoggingEnabled)
933 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnReadCompleted,
934 GetIOCompletionTraceMsg(eventArgs));
937 if (eventArgs.SocketError != SocketError.Success)
939 if (!thisPtr.m_IgnoreReadError)
941 thisPtr.m_ReadTaskCompletionSource.TrySetException(new SocketException(eventArgs.SocketError));
945 thisPtr.m_ReadTaskCompletionSource.TrySetResult(0);
950 thisPtr.m_ReadTaskCompletionSource.TrySetResult(eventArgs.BytesTransferred);
953 if (WebSocketBase.LoggingEnabled)
955 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
959 private static class Methods
961 public const string Close = "Close";
962 public const string OnCancel = "OnCancel";
963 public const string OnReadCompleted = "OnReadCompleted";
964 public const string OnWriteCompleted = "OnWriteCompleted";
965 public const string ReadAsyncCore = "ReadAsyncCore";
966 public const string WriteAsync = "WriteAsync";
967 public const string MultipleWriteAsync = "MultipleWriteAsync";