1 //------------------------------------------------------------------------------
2 // <copyright file="WebSocketHttpListenerDuplexStream.cs" company="Microsoft">
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 //------------------------------------------------------------------------------
7 namespace System.Net.WebSockets
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.ComponentModel;
12 using System.Diagnostics;
13 using System.Diagnostics.Contracts;
14 using System.Globalization;
17 using System.Runtime.InteropServices;
18 using System.Security;
19 using System.Threading;
20 using System.Threading.Tasks;
22 internal class WebSocketHttpListenerDuplexStream : Stream, WebSocketBase.IWebSocketStream
24 private static readonly EventHandler<HttpListenerAsyncEventArgs> s_OnReadCompleted =
25 new EventHandler<HttpListenerAsyncEventArgs>(OnReadCompleted);
26 private static readonly EventHandler<HttpListenerAsyncEventArgs> s_OnWriteCompleted =
27 new EventHandler<HttpListenerAsyncEventArgs>(OnWriteCompleted);
28 private static readonly Func<Exception, bool> s_CanHandleException = new Func<Exception, bool>(CanHandleException);
29 private static readonly Action<object> s_OnCancel = new Action<object>(OnCancel);
30 private readonly HttpRequestStream m_InputStream;
31 private readonly HttpResponseStream m_OutputStream;
32 private HttpListenerContext m_Context;
33 private bool m_InOpaqueMode;
34 private WebSocketBase m_WebSocket;
35 private HttpListenerAsyncEventArgs m_WriteEventArgs;
36 private HttpListenerAsyncEventArgs m_ReadEventArgs;
37 private TaskCompletionSource<object> m_WriteTaskCompletionSource;
38 private TaskCompletionSource<int> m_ReadTaskCompletionSource;
39 private int m_CleanedUp;
42 private class OutstandingOperations
45 internal int m_Writes;
48 private readonly OutstandingOperations m_OutstandingOperations = new OutstandingOperations();
51 public WebSocketHttpListenerDuplexStream(HttpRequestStream inputStream,
52 HttpResponseStream outputStream,
53 HttpListenerContext context)
55 Contract.Assert(inputStream != null, "'inputStream' MUST NOT be NULL.");
56 Contract.Assert(outputStream != null, "'outputStream' MUST NOT be NULL.");
57 Contract.Assert(context != null, "'context' MUST NOT be NULL.");
58 Contract.Assert(inputStream.CanRead, "'inputStream' MUST support read operations.");
59 Contract.Assert(outputStream.CanWrite, "'outputStream' MUST support write operations.");
61 m_InputStream = inputStream;
62 m_OutputStream = outputStream;
65 if (WebSocketBase.LoggingEnabled)
67 Logging.Associate(Logging.WebSockets, inputStream, this);
68 Logging.Associate(Logging.WebSockets, outputStream, this);
72 public override bool CanRead
76 return m_InputStream.CanRead;
80 public override bool CanSeek
88 public override bool CanTimeout
92 return m_InputStream.CanTimeout && m_OutputStream.CanTimeout;
96 public override bool CanWrite
100 return m_OutputStream.CanWrite;
104 public override long Length
108 throw new NotSupportedException(SR.GetString(SR.net_noseek));
112 public override long Position
116 throw new NotSupportedException(SR.GetString(SR.net_noseek));
120 throw new NotSupportedException(SR.GetString(SR.net_noseek));
124 public override int Read(byte[] buffer, int offset, int count)
126 return m_InputStream.Read(buffer, offset, count);
129 public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
131 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
133 return ReadAsyncCore(buffer, offset, count, cancellationToken);
136 private async Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
138 if (WebSocketBase.LoggingEnabled)
140 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncCore,
141 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
144 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
149 if (cancellationToken.CanBeCanceled)
151 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
156 bytesRead = await m_InputStream.ReadAsync(buffer, offset, count, cancellationToken).SuppressContextFlow<int>();
161 // When using fast path only one outstanding read is permitted. By switching into opaque mode
162 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
163 // caller takes responsibility for enforcing this constraint.
164 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Reads) == 1,
165 "Only one outstanding read allowed at any given time.");
167 m_ReadTaskCompletionSource = new TaskCompletionSource<int>();
168 m_ReadEventArgs.SetBuffer(buffer, offset, count);
169 if (!ReadAsyncFast(m_ReadEventArgs))
171 if (m_ReadEventArgs.Exception != null)
173 throw m_ReadEventArgs.Exception;
176 bytesRead = m_ReadEventArgs.BytesTransferred;
180 bytesRead = await m_ReadTaskCompletionSource.Task.SuppressContextFlow<int>();
184 catch (Exception error)
186 if (s_CanHandleException(error))
188 cancellationToken.ThrowIfCancellationRequested();
195 cancellationTokenRegistration.Dispose();
197 if (WebSocketBase.LoggingEnabled)
199 Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncCore, bytesRead);
206 // return value indicates sync vs async completion
207 // false: sync completion
208 // true: async completion
209 private unsafe bool ReadAsyncFast(HttpListenerAsyncEventArgs eventArgs)
211 if (WebSocketBase.LoggingEnabled)
213 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncFast, string.Empty);
216 eventArgs.StartOperationCommon(this);
217 eventArgs.StartOperationReceive();
220 bool completedAsynchronously = false;
223 Contract.Assert(eventArgs.Buffer != null, "'BufferList' is not supported for read operations.");
224 if (eventArgs.Count == 0 || m_InputStream.Closed)
226 eventArgs.FinishOperationSuccess(0, true);
231 int offset = eventArgs.Offset;
232 int remainingCount = eventArgs.Count;
234 if (m_InputStream.BufferedDataChunksAvailable)
236 dataRead = m_InputStream.GetChunks(eventArgs.Buffer, eventArgs.Offset, eventArgs.Count);
237 if (m_InputStream.BufferedDataChunksAvailable && dataRead == eventArgs.Count)
239 eventArgs.FinishOperationSuccess(eventArgs.Count, true);
244 Contract.Assert(!m_InputStream.BufferedDataChunksAvailable, "'m_InputStream.BufferedDataChunksAvailable' MUST BE 'FALSE' at this point.");
245 Contract.Assert(dataRead <= eventArgs.Count, "'dataRead' MUST NOT be bigger than 'eventArgs.Count'.");
249 offset += (int)dataRead;
250 remainingCount -= (int)dataRead;
251 //the http.sys team recommends that we limit the size to 128kb
252 if (remainingCount > HttpRequestStream.MaxReadSize)
254 remainingCount = HttpRequestStream.MaxReadSize;
257 eventArgs.SetBuffer(eventArgs.Buffer, offset, remainingCount);
259 else if (remainingCount > HttpRequestStream.MaxReadSize)
261 remainingCount = HttpRequestStream.MaxReadSize;
262 eventArgs.SetBuffer(eventArgs.Buffer, offset, remainingCount);
265 m_InputStream.InternalHttpContext.EnsureBoundHandle();
267 uint bytesReturned = 0;
269 UnsafeNclNativeMethods.HttpApi.HttpReceiveRequestEntityBody2(
270 m_InputStream.InternalHttpContext.RequestQueueHandle,
271 m_InputStream.InternalHttpContext.RequestId,
273 (byte*)m_WebSocket.InternalBuffer.ToIntPtr(eventArgs.Offset),
274 (uint)eventArgs.Count,
276 eventArgs.NativeOverlapped);
278 if (statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
279 statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_IO_PENDING &&
280 statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_HANDLE_EOF)
282 throw new HttpListenerException((int)statusCode);
284 else if (statusCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
285 HttpListener.SkipIOCPCallbackOnSuccess)
287 // IO operation completed synchronously. No IO completion port callback is used because
288 // it was disabled in SwitchToOpaqueMode()
289 eventArgs.FinishOperationSuccess((int)bytesReturned, true);
290 completedAsynchronously = false;
294 completedAsynchronously = true;
299 m_ReadEventArgs.FinishOperationFailure(e, true);
300 m_OutputStream.SetClosedFlag();
301 m_OutputStream.InternalHttpContext.Abort();
307 if (WebSocketBase.LoggingEnabled)
309 Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncFast, completedAsynchronously);
313 return completedAsynchronously;
316 public override int ReadByte()
318 return m_InputStream.ReadByte();
321 public bool SupportsMultipleWrite
329 public override IAsyncResult BeginRead(byte[] buffer,
332 AsyncCallback callback,
335 return m_InputStream.BeginRead(buffer, offset, count, callback, state);
338 public override int EndRead(IAsyncResult asyncResult)
340 return m_InputStream.EndRead(asyncResult);
343 public Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
345 Contract.Assert(m_InOpaqueMode, "The stream MUST be in opaque mode at this point.");
346 Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
347 Contract.Assert(sendBuffers.Count == 1 || sendBuffers.Count == 2,
348 "'sendBuffers.Count' MUST be either '1' or '2'.");
350 if (sendBuffers.Count == 1)
352 ArraySegment<byte> buffer = sendBuffers[0];
353 return WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
356 return MultipleWriteAsyncCore(sendBuffers, cancellationToken);
359 private async Task MultipleWriteAsyncCore(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
361 Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
362 Contract.Assert(sendBuffers.Count == 2, "'sendBuffers.Count' MUST be '2' at this point.");
364 if (WebSocketBase.LoggingEnabled)
366 Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsyncCore, string.Empty);
369 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
373 if (cancellationToken.CanBeCanceled)
375 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
378 // When using fast path only one outstanding read is permitted. By switching into opaque mode
379 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
380 // caller takes responsibility for enforcing this constraint.
381 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
382 "Only one outstanding write allowed at any given time.");
384 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
385 m_WriteEventArgs.SetBuffer(null, 0, 0);
386 m_WriteEventArgs.BufferList = sendBuffers;
387 if (WriteAsyncFast(m_WriteEventArgs))
389 await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
392 catch (Exception error)
394 if (s_CanHandleException(error))
396 cancellationToken.ThrowIfCancellationRequested();
403 cancellationTokenRegistration.Dispose();
405 if (WebSocketBase.LoggingEnabled)
407 Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsyncCore, string.Empty);
412 public override void Write(byte[] buffer, int offset, int count)
414 m_OutputStream.Write(buffer, offset, count);
417 public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
419 WebSocketHelpers.ValidateBuffer(buffer, offset, count);
421 return WriteAsyncCore(buffer, offset, count, cancellationToken);
424 private async Task WriteAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
426 if (WebSocketBase.LoggingEnabled)
428 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsyncCore,
429 WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
432 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
436 if (cancellationToken.CanBeCanceled)
438 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
443 await m_OutputStream.WriteAsync(buffer, offset, count, cancellationToken).SuppressContextFlow();
448 // When using fast path only one outstanding read is permitted. By switching into opaque mode
449 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
450 // caller takes responsibility for enforcing this constraint.
451 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
452 "Only one outstanding write allowed at any given time.");
454 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
455 m_WriteEventArgs.BufferList = null;
456 m_WriteEventArgs.SetBuffer(buffer, offset, count);
457 if (WriteAsyncFast(m_WriteEventArgs))
459 await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
463 catch (Exception error)
465 if (s_CanHandleException(error))
467 cancellationToken.ThrowIfCancellationRequested();
474 cancellationTokenRegistration.Dispose();
476 if (WebSocketBase.LoggingEnabled)
478 Logging.Exit(Logging.WebSockets, this, Methods.WriteAsyncCore, string.Empty);
483 // return value indicates sync vs async completion
484 // false: sync completion
485 // true: async completion
486 private bool WriteAsyncFast(HttpListenerAsyncEventArgs eventArgs)
488 if (WebSocketBase.LoggingEnabled)
490 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsyncFast, string.Empty);
493 UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS flags = UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.NONE;
495 eventArgs.StartOperationCommon(this);
496 eventArgs.StartOperationSend();
499 bool completedAsynchronously = false;
502 if (m_OutputStream.Closed ||
503 (eventArgs.Buffer != null && eventArgs.Count == 0))
505 eventArgs.FinishOperationSuccess(eventArgs.Count, true);
509 if (eventArgs.ShouldCloseOutput)
511 flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_DISCONNECT;
515 flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_MORE_DATA;
516 // When using HTTP_SEND_RESPONSE_FLAG_BUFFER_DATA HTTP.SYS will copy the payload to
517 // kernel memory (Non-Paged Pool). Http.Sys will buffer up to
518 // Math.Min(16 MB, current TCP window size)
519 flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_BUFFER_DATA;
522 m_OutputStream.InternalHttpContext.EnsureBoundHandle();
525 UnsafeNclNativeMethods.HttpApi.HttpSendResponseEntityBody2(
526 m_OutputStream.InternalHttpContext.RequestQueueHandle,
527 m_OutputStream.InternalHttpContext.RequestId,
529 eventArgs.EntityChunkCount,
530 eventArgs.EntityChunks,
534 eventArgs.NativeOverlapped,
537 if (statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
538 statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_IO_PENDING)
540 throw new HttpListenerException((int)statusCode);
542 else if (statusCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
543 HttpListener.SkipIOCPCallbackOnSuccess)
545 // IO operation completed synchronously - callback won't be called to signal completion.
546 eventArgs.FinishOperationSuccess((int)bytesSent, true);
547 completedAsynchronously = false;
551 completedAsynchronously = true;
556 m_WriteEventArgs.FinishOperationFailure(e, true);
557 m_OutputStream.SetClosedFlag();
558 m_OutputStream.InternalHttpContext.Abort();
564 if (WebSocketBase.LoggingEnabled)
566 Logging.Exit(Logging.WebSockets, this, Methods.WriteAsyncFast, completedAsynchronously);
570 return completedAsynchronously;
573 public override void WriteByte(byte value)
575 m_OutputStream.WriteByte(value);
578 public override IAsyncResult BeginWrite(byte[] buffer,
581 AsyncCallback callback,
584 return m_OutputStream.BeginWrite(buffer, offset, count, callback, state);
587 public override void EndWrite(IAsyncResult asyncResult)
589 m_OutputStream.EndWrite(asyncResult);
592 public override void Flush()
594 m_OutputStream.Flush();
597 public override Task FlushAsync(CancellationToken cancellationToken)
599 return m_OutputStream.FlushAsync(cancellationToken);
602 public override long Seek(long offset, SeekOrigin origin)
604 throw new NotSupportedException(SR.GetString(SR.net_noseek));
607 public override void SetLength(long value)
609 throw new NotSupportedException(SR.GetString(SR.net_noseek));
612 public async Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
614 // need to yield here to make sure that we don't get any exception synchronously
617 if (WebSocketBase.LoggingEnabled)
619 Logging.Enter(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
622 CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
626 if (cancellationToken.CanBeCanceled)
628 cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
631 // When using fast path only one outstanding read is permitted. By switching into opaque mode
632 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
633 // caller takes responsibility for enforcing this constraint.
634 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
635 "Only one outstanding write allowed at any given time.");
637 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
638 m_WriteEventArgs.SetShouldCloseOutput();
639 if (WriteAsyncFast(m_WriteEventArgs))
641 await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
644 catch (Exception error)
646 if (!s_CanHandleException(error))
651 // throw OperationCancelledException when canceled by the caller
652 // otherwise swallow the exception
653 cancellationToken.ThrowIfCancellationRequested();
657 cancellationTokenRegistration.Dispose();
659 if (WebSocketBase.LoggingEnabled)
661 Logging.Exit(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
666 protected override void Dispose(bool disposing)
668 if (disposing && Interlocked.Exchange(ref m_CleanedUp, 1) == 0)
670 if (m_ReadTaskCompletionSource != null)
672 m_ReadTaskCompletionSource.TrySetCanceled();
675 if (m_WriteTaskCompletionSource != null)
677 m_WriteTaskCompletionSource.TrySetCanceled();
680 if (m_ReadEventArgs != null)
682 m_ReadEventArgs.Dispose();
685 if (m_WriteEventArgs != null)
687 m_WriteEventArgs.Dispose();
692 m_InputStream.Close();
696 m_OutputStream.Close();
706 private static bool CanHandleException(Exception error)
708 return error is HttpListenerException ||
709 error is ObjectDisposedException ||
710 error is IOException;
713 private static void OnCancel(object state)
715 Contract.Assert(state != null, "'state' MUST NOT be NULL.");
716 WebSocketHttpListenerDuplexStream thisPtr = state as WebSocketHttpListenerDuplexStream;
717 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
719 if (WebSocketBase.LoggingEnabled)
721 Logging.Enter(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
726 thisPtr.m_OutputStream.SetClosedFlag();
727 thisPtr.m_Context.Abort();
731 TaskCompletionSource<int> readTaskCompletionSourceSnapshot = thisPtr.m_ReadTaskCompletionSource;
733 if (readTaskCompletionSourceSnapshot != null)
735 readTaskCompletionSourceSnapshot.TrySetCanceled();
738 TaskCompletionSource<object> writeTaskCompletionSourceSnapshot = thisPtr.m_WriteTaskCompletionSource;
740 if (writeTaskCompletionSourceSnapshot != null)
742 writeTaskCompletionSourceSnapshot.TrySetCanceled();
745 if (WebSocketBase.LoggingEnabled)
747 Logging.Exit(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
751 public void SwitchToOpaqueMode(WebSocketBase webSocket)
753 Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
754 Contract.Assert(m_OutputStream != null, "'m_OutputStream' MUST NOT be NULL.");
755 Contract.Assert(m_OutputStream.InternalHttpContext != null,
756 "'m_OutputStream.InternalHttpContext' MUST NOT be NULL.");
757 Contract.Assert(m_OutputStream.InternalHttpContext.Response != null,
758 "'m_OutputStream.InternalHttpContext.Response' MUST NOT be NULL.");
759 Contract.Assert(m_OutputStream.InternalHttpContext.Response.SentHeaders,
760 "Headers MUST have been sent at this point.");
761 Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
765 throw new InvalidOperationException();
768 m_WebSocket = webSocket;
769 m_InOpaqueMode = true;
770 m_ReadEventArgs = new HttpListenerAsyncEventArgs(webSocket, this);
771 m_ReadEventArgs.Completed += s_OnReadCompleted;
772 m_WriteEventArgs = new HttpListenerAsyncEventArgs(webSocket, this);
773 m_WriteEventArgs.Completed += s_OnWriteCompleted;
775 if (WebSocketBase.LoggingEnabled)
777 Logging.Associate(Logging.WebSockets, this, webSocket);
781 private static void OnWriteCompleted(object sender, HttpListenerAsyncEventArgs eventArgs)
783 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
784 WebSocketHttpListenerDuplexStream thisPtr = eventArgs.CurrentStream;
785 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
787 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Writes) >= 0,
788 "'thisPtr.m_OutstandingOperations.m_Writes' MUST NOT be negative.");
791 if (WebSocketBase.LoggingEnabled)
793 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
796 if (eventArgs.Exception != null)
798 thisPtr.m_WriteTaskCompletionSource.TrySetException(eventArgs.Exception);
802 thisPtr.m_WriteTaskCompletionSource.TrySetResult(null);
805 if (WebSocketBase.LoggingEnabled)
807 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
811 private static void OnReadCompleted(object sender, HttpListenerAsyncEventArgs eventArgs)
813 Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
814 WebSocketHttpListenerDuplexStream thisPtr = eventArgs.CurrentStream;
815 Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
817 Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Reads) >= 0,
818 "'thisPtr.m_OutstandingOperations.m_Reads' MUST NOT be negative.");
821 if (WebSocketBase.LoggingEnabled)
823 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
826 if (eventArgs.Exception != null)
828 thisPtr.m_ReadTaskCompletionSource.TrySetException(eventArgs.Exception);
832 thisPtr.m_ReadTaskCompletionSource.TrySetResult(eventArgs.BytesTransferred);
835 if (WebSocketBase.LoggingEnabled)
837 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
841 internal class HttpListenerAsyncEventArgs : EventArgs, IDisposable
843 private const int Free = 0;
844 private const int InProgress = 1;
845 private const int Disposed = 2;
846 private int m_Operating;
848 private bool m_DisposeCalled;
849 private SafeNativeOverlapped m_PtrNativeOverlapped;
850 private Overlapped m_Overlapped;
851 private event EventHandler<HttpListenerAsyncEventArgs> m_Completed;
852 private byte[] m_Buffer;
853 private IList<ArraySegment<byte>> m_BufferList;
855 private int m_Offset;
856 private int m_BytesTransferred;
857 private HttpListenerAsyncOperation m_CompletedOperation;
858 private UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK[] m_DataChunks;
859 private GCHandle m_DataChunksGCHandle;
860 private ushort m_DataChunkCount;
861 private Exception m_Exception;
862 private bool m_ShouldCloseOutput;
863 private readonly WebSocketBase m_WebSocket;
864 private readonly WebSocketHttpListenerDuplexStream m_CurrentStream;
866 public HttpListenerAsyncEventArgs(WebSocketBase webSocket, WebSocketHttpListenerDuplexStream stream)
869 m_WebSocket = webSocket;
870 m_CurrentStream = stream;
871 InitializeOverlapped();
874 public int BytesTransferred
876 get { return m_BytesTransferred; }
881 get { return m_Buffer; }
884 // BufferList property.
885 // Mutually exclusive with Buffer.
886 // Setting this property with an existing non-null Buffer will cause an assert.
887 public IList<ArraySegment<byte>> BufferList
889 get { return m_BufferList; }
892 Contract.Assert(!m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'false' at this point.");
893 Contract.Assert(value == null || m_Buffer == null,
894 "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
895 Contract.Assert(m_Operating == Free,
896 "This property can only be modified if no IO operation is outstanding.");
897 Contract.Assert(value == null || value.Count == 2,
898 "This list can only be 'NULL' or MUST have exactly '2' items.");
899 m_BufferList = value;
903 public bool ShouldCloseOutput
905 get { return m_ShouldCloseOutput; }
910 get { return m_Offset; }
915 get { return m_Count; }
918 public Exception Exception
920 get { return m_Exception; }
923 public ushort EntityChunkCount
927 if (m_DataChunks == null)
932 return m_DataChunkCount;
936 public SafeNativeOverlapped NativeOverlapped
938 get { return m_PtrNativeOverlapped; }
941 public IntPtr EntityChunks
945 if (m_DataChunks == null)
950 return Marshal.UnsafeAddrOfPinnedArrayElement(m_DataChunks, 0);
954 public WebSocketHttpListenerDuplexStream CurrentStream
956 get { return m_CurrentStream; }
959 public event EventHandler<HttpListenerAsyncEventArgs> Completed
963 m_Completed += value;
967 m_Completed -= value;
971 protected virtual void OnCompleted(HttpListenerAsyncEventArgs e)
973 EventHandler<HttpListenerAsyncEventArgs> handler = m_Completed;
976 handler(e.m_CurrentStream, e);
980 public void SetShouldCloseOutput()
984 m_ShouldCloseOutput = true;
987 public void Dispose()
989 // Remember that Dispose was called.
990 m_DisposeCalled = true;
992 // Check if this object is in-use for an async socket operation.
993 if (Interlocked.CompareExchange(ref m_Operating, Disposed, Free) != Free)
995 // Either already disposed or will be disposed when current operation completes.
999 // OK to dispose now.
1000 // Free native overlapped data.
1001 FreeOverlapped(false);
1003 // Don't bother finalizing later.
1004 GC.SuppressFinalize(this);
1008 ~HttpListenerAsyncEventArgs()
1010 FreeOverlapped(true);
1013 private unsafe void InitializeOverlapped()
1015 m_Overlapped = new Overlapped();
1016 m_PtrNativeOverlapped = new SafeNativeOverlapped(m_Overlapped.UnsafePack(CompletionPortCallback, null));
1019 // Method to clean up any existing Overlapped object and related state variables.
1020 private void FreeOverlapped(bool checkForShutdown)
1022 if (!checkForShutdown || !NclUtilities.HasShutdownStarted)
1024 // Free the overlapped object
1025 if (m_PtrNativeOverlapped != null && !m_PtrNativeOverlapped.IsInvalid)
1027 m_PtrNativeOverlapped.Dispose();
1030 if (m_DataChunksGCHandle.IsAllocated)
1032 m_DataChunksGCHandle.Free();
1037 // Method called to prepare for a native async http.sys call.
1038 // This method performs the tasks common to all http.sys operations.
1039 internal void StartOperationCommon(WebSocketHttpListenerDuplexStream currentStream)
1041 // Change status to "in-use".
1042 if(Interlocked.CompareExchange(ref m_Operating, InProgress, Free) != Free)
1044 // If it was already "in-use" check if Dispose was called.
1045 if (m_DisposeCalled)
1047 // Dispose was called - throw ObjectDisposed.
1048 throw new ObjectDisposedException(GetType().FullName);
1051 Contract.Assert(false, "Only one outstanding async operation is allowed per HttpListenerAsyncEventArgs instance.");
1052 // Only one at a time.
1053 throw new InvalidOperationException();
1056 // HttpSendResponseEntityBody can return ERROR_INVALID_PARAMETER if the InternalHigh field of the overlapped
1057 // is not IntPtr.Zero, so we have to reset this field because we are reusing the Overlapped.
1058 // When using the IAsyncResult based approach of HttpListenerResponseStream the Overlapped is reinitialized
1059 // for each operation by the CLR when returned from the OverlappedDataCache.
1060 NativeOverlapped.ReinitializeNativeOverlapped();
1062 m_BytesTransferred = 0;
1065 internal void StartOperationReceive()
1067 // Remember the operation type.
1068 m_CompletedOperation = HttpListenerAsyncOperation.Receive;
1071 internal void StartOperationSend()
1075 // Remember the operation type.
1076 m_CompletedOperation = HttpListenerAsyncOperation.Send;
1079 public void SetBuffer(byte[] buffer, int offset, int count)
1081 Contract.Assert(!m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'false' at this point.");
1082 Contract.Assert(buffer == null || m_BufferList == null, "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
1088 private unsafe void UpdateDataChunk()
1090 if (m_DataChunks == null)
1092 m_DataChunks = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK[2];
1093 m_DataChunksGCHandle = GCHandle.Alloc(m_DataChunks, GCHandleType.Pinned);
1094 m_DataChunks[0] = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK();
1095 m_DataChunks[0].DataChunkType = UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
1096 m_DataChunks[1] = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK();
1097 m_DataChunks[1].DataChunkType = UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
1100 Contract.Assert(m_Buffer == null || m_BufferList == null, "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
1101 Contract.Assert(m_ShouldCloseOutput || m_Buffer != null || m_BufferList != null, "Either 'm_Buffer' or 'm_BufferList' MUST NOT be NULL.");
1103 // The underlying byte[] m_Buffer or each m_BufferList[].Array are pinned already
1104 if (m_Buffer != null)
1106 UpdateDataChunk(0, m_Buffer, m_Offset, m_Count);
1107 UpdateDataChunk(1, null, 0, 0);
1108 m_DataChunkCount = 1;
1110 else if (m_BufferList != null)
1112 Contract.Assert(m_BufferList != null && m_BufferList.Count == 2,
1113 "'m_BufferList' MUST NOT be NULL and have exactly '2' items at this point.");
1114 UpdateDataChunk(0, m_BufferList[0].Array, m_BufferList[0].Offset, m_BufferList[0].Count);
1115 UpdateDataChunk(1, m_BufferList[1].Array, m_BufferList[1].Offset, m_BufferList[1].Count);
1116 m_DataChunkCount = 2;
1120 Contract.Assert(m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'true' at this point.");
1121 m_DataChunks = null;
1125 private unsafe void UpdateDataChunk(int index, byte[] buffer, int offset, int count)
1129 m_DataChunks[index].pBuffer = null;
1130 m_DataChunks[index].BufferLength = 0;
1134 if (m_WebSocket.InternalBuffer.IsInternalBuffer(buffer, offset, count))
1136 m_DataChunks[index].pBuffer = (byte*)(m_WebSocket.InternalBuffer.ToIntPtr(offset));
1140 m_DataChunks[index].pBuffer =
1141 (byte*)m_WebSocket.InternalBuffer.ConvertPinnedSendPayloadToNative(buffer, offset, count);
1144 m_DataChunks[index].BufferLength = (uint)count;
1147 // Method to mark this object as no longer "in-use".
1148 // Will also execute a Dispose deferred because I/O was in progress.
1149 internal void Complete()
1151 // Mark as not in-use
1154 // Check for deferred Dispose().
1155 // The deferred Dispose is not guaranteed if Dispose is called while an operation is in progress.
1156 // The m_DisposeCalled variable is not managed in a thread-safe manner on purpose for performance.
1157 if (m_DisposeCalled)
1163 // Method to update internal state after sync or async completion.
1164 private void SetResults(Exception exception, int bytesTransferred)
1166 m_Exception = exception;
1167 m_BytesTransferred = bytesTransferred;
1170 internal void FinishOperationFailure(Exception exception, bool syncCompletion)
1172 SetResults(exception, 0);
1174 if (WebSocketBase.LoggingEnabled)
1176 Logging.PrintError(Logging.WebSockets, m_CurrentStream,
1177 m_CompletedOperation == HttpListenerAsyncOperation.Receive ? Methods.ReadAsyncFast : Methods.WriteAsyncFast,
1178 exception.ToString());
1185 internal void FinishOperationSuccess(int bytesTransferred, bool syncCompletion)
1187 SetResults(null, bytesTransferred);
1189 if (WebSocketBase.LoggingEnabled)
1191 if (m_Buffer != null)
1193 Logging.Dump(Logging.WebSockets, m_CurrentStream,
1194 m_CompletedOperation == HttpListenerAsyncOperation.Receive ? Methods.ReadAsyncFast : Methods.WriteAsyncFast,
1195 m_Buffer, m_Offset, bytesTransferred);
1197 else if (m_BufferList != null)
1199 Contract.Assert(m_CompletedOperation == HttpListenerAsyncOperation.Send,
1200 "'BufferList' is only supported for send operations.");
1202 foreach (ArraySegment<byte> buffer in BufferList)
1204 Logging.Dump(Logging.WebSockets, this, Methods.WriteAsyncFast, buffer.Array, buffer.Offset, buffer.Count);
1209 Logging.PrintLine(Logging.WebSockets, TraceEventType.Verbose, 0,
1210 string.Format(CultureInfo.InvariantCulture, "Output channel closed for {0}#{1}",
1211 m_CurrentStream.GetType().Name, ValidationHelper.HashString(m_CurrentStream)));
1215 if (m_ShouldCloseOutput)
1217 m_CurrentStream.m_OutputStream.SetClosedFlag();
1220 // Complete the operation and raise completion event.
1225 private unsafe void CompletionPortCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeOverlapped)
1227 if (errorCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS ||
1228 errorCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_HANDLE_EOF)
1230 FinishOperationSuccess((int)numBytes, false);
1234 FinishOperationFailure(new HttpListenerException((int)errorCode), false);
1238 public enum HttpListenerAsyncOperation
1246 private static class Methods
1248 public const string CloseNetworkConnectionAsync = "CloseNetworkConnectionAsync";
1249 public const string OnCancel = "OnCancel";
1250 public const string OnReadCompleted = "OnReadCompleted";
1251 public const string OnWriteCompleted = "OnWriteCompleted";
1252 public const string ReadAsyncFast = "ReadAsyncFast";
1253 public const string ReadAsyncCore = "ReadAsyncCore";
1254 public const string WriteAsyncFast = "WriteAsyncFast";
1255 public const string WriteAsyncCore = "WriteAsyncCore";
1256 public const string MultipleWriteAsyncCore = "MultipleWriteAsyncCore";