9fd21d9ce24c54e10690818ce0e73327e3451448
[mono.git] / mcs / class / referencesource / System / net / System / Net / WebSockets / WebSocketHttpListenerDuplexStream.cs
1 //------------------------------------------------------------------------------
2 // <copyright file="WebSocketHttpListenerDuplexStream.cs" company="Microsoft">
3 //     Copyright (c) Microsoft Corporation.  All rights reserved.
4 // </copyright>
5 //------------------------------------------------------------------------------
6
7 namespace System.Net.WebSockets
8 {
9     using System.Collections.Concurrent;
10     using System.Collections.Generic;
11     using System.ComponentModel;
12     using System.Diagnostics;
13     using System.Diagnostics.Contracts;
14     using System.Globalization;
15     using System.IO;
16     using System.Net;
17     using System.Runtime.InteropServices;
18     using System.Security;
19     using System.Threading;
20     using System.Threading.Tasks;
21
22     internal class WebSocketHttpListenerDuplexStream : Stream, WebSocketBase.IWebSocketStream
23     {
24         private static readonly EventHandler<HttpListenerAsyncEventArgs> s_OnReadCompleted =
25             new EventHandler<HttpListenerAsyncEventArgs>(OnReadCompleted);
26         private static readonly EventHandler<HttpListenerAsyncEventArgs> s_OnWriteCompleted =
27             new EventHandler<HttpListenerAsyncEventArgs>(OnWriteCompleted);
28         private static readonly Func<Exception, bool> s_CanHandleException = new Func<Exception, bool>(CanHandleException);
29         private static readonly Action<object> s_OnCancel = new Action<object>(OnCancel);
30         private readonly HttpRequestStream m_InputStream;
31         private readonly HttpResponseStream m_OutputStream;
32         private HttpListenerContext m_Context;
33         private bool m_InOpaqueMode;
34         private WebSocketBase m_WebSocket;
35         private HttpListenerAsyncEventArgs m_WriteEventArgs;
36         private HttpListenerAsyncEventArgs m_ReadEventArgs;
37         private TaskCompletionSource<object> m_WriteTaskCompletionSource;
38         private TaskCompletionSource<int> m_ReadTaskCompletionSource;
39         private int m_CleanedUp;
40
41 #if DEBUG
42         private class OutstandingOperations
43         {
44             internal int m_Reads;
45             internal int m_Writes;
46         }
47
48         private readonly OutstandingOperations m_OutstandingOperations = new OutstandingOperations();
49 #endif //DEBUG
50
51         public WebSocketHttpListenerDuplexStream(HttpRequestStream inputStream,
52             HttpResponseStream outputStream,
53             HttpListenerContext context)
54         {
55             Contract.Assert(inputStream != null, "'inputStream' MUST NOT be NULL.");
56             Contract.Assert(outputStream != null, "'outputStream' MUST NOT be NULL.");
57             Contract.Assert(context != null, "'context' MUST NOT be NULL.");
58             Contract.Assert(inputStream.CanRead, "'inputStream' MUST support read operations.");
59             Contract.Assert(outputStream.CanWrite, "'outputStream' MUST support write operations.");
60
61             m_InputStream = inputStream;
62             m_OutputStream = outputStream;
63             m_Context = context;
64
65             if (WebSocketBase.LoggingEnabled)
66             {
67                 Logging.Associate(Logging.WebSockets, inputStream, this);
68                 Logging.Associate(Logging.WebSockets, outputStream, this);
69             }
70         }
71
72         public override bool CanRead
73         {
74             get
75             {
76                 return m_InputStream.CanRead;
77             }
78         }
79
80         public override bool CanSeek
81         {
82             get
83             {
84                 return false;
85             }
86         }
87
88         public override bool CanTimeout
89         {
90             get
91             {
92                 return m_InputStream.CanTimeout && m_OutputStream.CanTimeout;
93             }
94         }
95
96         public override bool CanWrite
97         {
98             get
99             {
100                 return m_OutputStream.CanWrite;
101             }
102         }
103
104         public override long Length
105         {
106             get
107             {
108                 throw new NotSupportedException(SR.GetString(SR.net_noseek));
109             }
110         }
111
112         public override long Position
113         {
114             get
115             {
116                 throw new NotSupportedException(SR.GetString(SR.net_noseek));
117             }
118             set
119             {
120                 throw new NotSupportedException(SR.GetString(SR.net_noseek));
121             }
122         }
123
124         public override int Read(byte[] buffer, int offset, int count)
125         {
126             return m_InputStream.Read(buffer, offset, count);
127         }
128
129         public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
130         {
131             WebSocketHelpers.ValidateBuffer(buffer, offset, count);
132
133             return ReadAsyncCore(buffer, offset, count, cancellationToken);
134         }
135
136         private async Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
137         {
138             if (WebSocketBase.LoggingEnabled)
139             {
140                 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncCore, 
141                     WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
142             }
143
144             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
145
146             int bytesRead = 0;
147             try
148             {
149                 if (cancellationToken.CanBeCanceled)
150                 {
151                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
152                 }
153
154                 if (!m_InOpaqueMode)
155                 {
156                     bytesRead = await m_InputStream.ReadAsync(buffer, offset, count, cancellationToken).SuppressContextFlow<int>();
157                 }
158                 else
159                 {
160 #if DEBUG
161                     // When using fast path only one outstanding read is permitted. By switching into opaque mode
162                     // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
163                     // caller takes responsibility for enforcing this constraint.
164                     Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Reads) == 1,
165                         "Only one outstanding read allowed at any given time.");
166 #endif
167                     m_ReadTaskCompletionSource = new TaskCompletionSource<int>();
168                     m_ReadEventArgs.SetBuffer(buffer, offset, count);
169                     if (!ReadAsyncFast(m_ReadEventArgs))
170                     {
171                         if (m_ReadEventArgs.Exception != null)
172                         {
173                             throw m_ReadEventArgs.Exception;
174                         }
175
176                         bytesRead = m_ReadEventArgs.BytesTransferred;
177                     }
178                     else
179                     {
180                         bytesRead = await m_ReadTaskCompletionSource.Task.SuppressContextFlow<int>();
181                     }
182                 }
183             }
184             catch (Exception error)
185             {
186                 if (s_CanHandleException(error))
187                 {
188                     cancellationToken.ThrowIfCancellationRequested();
189                 }
190
191                 throw;
192             }
193             finally
194             {
195                 cancellationTokenRegistration.Dispose();
196
197                 if (WebSocketBase.LoggingEnabled)
198                 {
199                     Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncCore, bytesRead);
200                 }
201             }
202
203             return bytesRead;
204         }
205
206         // return value indicates sync vs async completion
207         // false: sync completion
208         // true: async completion
209         private unsafe bool ReadAsyncFast(HttpListenerAsyncEventArgs eventArgs)
210         {
211             if (WebSocketBase.LoggingEnabled)
212             {
213                 Logging.Enter(Logging.WebSockets, this, Methods.ReadAsyncFast, string.Empty);
214             }
215
216             eventArgs.StartOperationCommon(this);
217             eventArgs.StartOperationReceive();
218
219             uint statusCode = 0;
220             bool completedAsynchronously = false;
221             try
222             {
223                 Contract.Assert(eventArgs.Buffer != null, "'BufferList' is not supported for read operations.");
224                 if (eventArgs.Count == 0 || m_InputStream.Closed)
225                 {
226                     eventArgs.FinishOperationSuccess(0, true);
227                     return false;
228                 }
229
230                 uint dataRead = 0;
231                 int offset = eventArgs.Offset;
232                 int remainingCount = eventArgs.Count;
233
234                 if (m_InputStream.BufferedDataChunksAvailable)
235                 {
236                     dataRead = m_InputStream.GetChunks(eventArgs.Buffer, eventArgs.Offset, eventArgs.Count);
237                     if (m_InputStream.BufferedDataChunksAvailable && dataRead == eventArgs.Count)
238                     {
239                         eventArgs.FinishOperationSuccess(eventArgs.Count, true);
240                         return false;
241                     }
242                 }
243
244                 Contract.Assert(!m_InputStream.BufferedDataChunksAvailable, "'m_InputStream.BufferedDataChunksAvailable' MUST BE 'FALSE' at this point.");
245                 Contract.Assert(dataRead <= eventArgs.Count, "'dataRead' MUST NOT be bigger than 'eventArgs.Count'.");
246
247                 if (dataRead != 0)
248                 {
249                     offset += (int)dataRead;
250                     remainingCount -= (int)dataRead;
251                     //the http.sys team recommends that we limit the size to 128kb
252                     if (remainingCount > HttpRequestStream.MaxReadSize)
253                     {
254                         remainingCount = HttpRequestStream.MaxReadSize;
255                     }
256
257                     eventArgs.SetBuffer(eventArgs.Buffer, offset, remainingCount);
258                 }
259                 else if (remainingCount > HttpRequestStream.MaxReadSize)
260                 {
261                     remainingCount = HttpRequestStream.MaxReadSize;
262                     eventArgs.SetBuffer(eventArgs.Buffer, offset, remainingCount);
263                 }
264
265                 m_InputStream.InternalHttpContext.EnsureBoundHandle();
266                 uint flags = 0;
267                 uint bytesReturned = 0;
268                 statusCode =
269                     UnsafeNclNativeMethods.HttpApi.HttpReceiveRequestEntityBody2(
270                         m_InputStream.InternalHttpContext.RequestQueueHandle,
271                         m_InputStream.InternalHttpContext.RequestId,
272                         flags,
273                         (byte*)m_WebSocket.InternalBuffer.ToIntPtr(eventArgs.Offset),
274                         (uint)eventArgs.Count,
275                         out bytesReturned,
276                         eventArgs.NativeOverlapped);
277
278                 if (statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
279                     statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_IO_PENDING &&
280                     statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_HANDLE_EOF)
281                 {
282                     throw new HttpListenerException((int)statusCode);
283                 }
284                 else if (statusCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
285                     HttpListener.SkipIOCPCallbackOnSuccess)
286                 {
287                     // IO operation completed synchronously. No IO completion port callback is used because 
288                     // it was disabled in SwitchToOpaqueMode()
289                     eventArgs.FinishOperationSuccess((int)bytesReturned, true);
290                     completedAsynchronously = false;
291                 }
292                 else
293                 {
294                     completedAsynchronously = true;
295                 }
296             }
297             catch (Exception e)
298             {
299                 m_ReadEventArgs.FinishOperationFailure(e, true);
300                 m_OutputStream.SetClosedFlag();
301                 m_OutputStream.InternalHttpContext.Abort();
302
303                 throw;
304             }
305             finally
306             {
307                 if (WebSocketBase.LoggingEnabled)
308                 {
309                     Logging.Exit(Logging.WebSockets, this, Methods.ReadAsyncFast, completedAsynchronously);
310                 }
311             }
312
313             return completedAsynchronously;
314         }
315
316         public override int ReadByte()
317         {
318             return m_InputStream.ReadByte();
319         }
320
321         public bool SupportsMultipleWrite
322         {
323             get
324             {
325                 return true;
326             }
327         }
328
329         public override IAsyncResult BeginRead(byte[] buffer,
330             int offset,
331             int count,
332             AsyncCallback callback,
333             object state)
334         {
335             return m_InputStream.BeginRead(buffer, offset, count, callback, state);
336         }
337
338         public override int EndRead(IAsyncResult asyncResult)
339         {
340             return m_InputStream.EndRead(asyncResult);
341         }
342         
343         public Task MultipleWriteAsync(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
344         {
345             Contract.Assert(m_InOpaqueMode, "The stream MUST be in opaque mode at this point.");
346             Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
347             Contract.Assert(sendBuffers.Count == 1 || sendBuffers.Count == 2,
348                 "'sendBuffers.Count' MUST be either '1' or '2'.");
349
350             if (sendBuffers.Count == 1)
351             {
352                 ArraySegment<byte> buffer = sendBuffers[0];
353                 return WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
354             }
355
356             return MultipleWriteAsyncCore(sendBuffers, cancellationToken);
357         }
358
359         private async Task MultipleWriteAsyncCore(IList<ArraySegment<byte>> sendBuffers, CancellationToken cancellationToken)
360         {
361             Contract.Assert(sendBuffers != null, "'sendBuffers' MUST NOT be NULL.");
362             Contract.Assert(sendBuffers.Count == 2, "'sendBuffers.Count' MUST be '2' at this point.");
363
364             if (WebSocketBase.LoggingEnabled)
365             {
366                 Logging.Enter(Logging.WebSockets, this, Methods.MultipleWriteAsyncCore, string.Empty);
367             }
368
369             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
370
371             try
372             {
373                 if (cancellationToken.CanBeCanceled)
374                 {
375                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
376                 }
377 #if DEBUG
378                 // When using fast path only one outstanding read is permitted. By switching into opaque mode
379                 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
380                 // caller takes responsibility for enforcing this constraint.
381                 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
382                     "Only one outstanding write allowed at any given time.");
383 #endif
384                 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
385                 m_WriteEventArgs.SetBuffer(null, 0, 0);
386                 m_WriteEventArgs.BufferList = sendBuffers;
387                 if (WriteAsyncFast(m_WriteEventArgs))
388                 {
389                     await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
390                 }
391             }
392             catch (Exception error)
393             {
394                 if (s_CanHandleException(error))
395                 {
396                     cancellationToken.ThrowIfCancellationRequested();
397                 }
398
399                 throw;
400             }
401             finally
402             {
403                 cancellationTokenRegistration.Dispose();
404
405                 if (WebSocketBase.LoggingEnabled)
406                 {
407                     Logging.Exit(Logging.WebSockets, this, Methods.MultipleWriteAsyncCore, string.Empty);
408                 }
409             }
410         }
411
412         public override void Write(byte[] buffer, int offset, int count)
413         {
414             m_OutputStream.Write(buffer, offset, count);
415         }
416
417         public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
418         {
419             WebSocketHelpers.ValidateBuffer(buffer, offset, count);
420
421             return WriteAsyncCore(buffer, offset, count, cancellationToken);
422         }
423
424         private async Task WriteAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
425         {
426             if (WebSocketBase.LoggingEnabled)
427             {
428                 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsyncCore,
429                     WebSocketHelpers.GetTraceMsgForParameters(offset, count, cancellationToken));
430             }
431
432             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
433
434             try
435             {
436                 if (cancellationToken.CanBeCanceled)
437                 {
438                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
439                 }
440
441                 if (!m_InOpaqueMode)
442                 {
443                     await m_OutputStream.WriteAsync(buffer, offset, count, cancellationToken).SuppressContextFlow();
444                 }
445                 else
446                 {
447 #if DEBUG
448                     // When using fast path only one outstanding read is permitted. By switching into opaque mode
449                     // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
450                     // caller takes responsibility for enforcing this constraint.
451                     Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
452                         "Only one outstanding write allowed at any given time.");
453 #endif
454                     m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
455                     m_WriteEventArgs.BufferList = null;
456                     m_WriteEventArgs.SetBuffer(buffer, offset, count);
457                     if (WriteAsyncFast(m_WriteEventArgs))
458                     {
459                         await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
460                     }
461                 }
462             }
463             catch (Exception error)
464             {
465                 if (s_CanHandleException(error))
466                 {
467                     cancellationToken.ThrowIfCancellationRequested();
468                 }
469
470                 throw;
471             }
472             finally
473             {
474                 cancellationTokenRegistration.Dispose();
475
476                 if (WebSocketBase.LoggingEnabled)
477                 {
478                     Logging.Exit(Logging.WebSockets, this, Methods.WriteAsyncCore, string.Empty);
479                 }
480             }
481         }
482
483         // return value indicates sync vs async completion
484         // false: sync completion
485         // true: async completion
486         private bool WriteAsyncFast(HttpListenerAsyncEventArgs eventArgs)
487         {
488             if (WebSocketBase.LoggingEnabled)
489             {
490                 Logging.Enter(Logging.WebSockets, this, Methods.WriteAsyncFast, string.Empty);
491             }
492
493             UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS flags = UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.NONE;
494
495             eventArgs.StartOperationCommon(this);
496             eventArgs.StartOperationSend();
497
498             uint statusCode;
499             bool completedAsynchronously = false;
500             try
501             {
502                 if (m_OutputStream.Closed || 
503                     (eventArgs.Buffer != null && eventArgs.Count == 0))
504                 {
505                     eventArgs.FinishOperationSuccess(eventArgs.Count, true);
506                     return false;
507                 }
508
509                 if (eventArgs.ShouldCloseOutput)
510                 {
511                     flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_DISCONNECT;
512                 }
513                 else
514                 {
515                     flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_MORE_DATA;
516                     // When using HTTP_SEND_RESPONSE_FLAG_BUFFER_DATA HTTP.SYS will copy the payload to
517                     // kernel memory (Non-Paged Pool). Http.Sys will buffer up to
518                     // Math.Min(16 MB, current TCP window size)
519                     flags |= UnsafeNclNativeMethods.HttpApi.HTTP_FLAGS.HTTP_SEND_RESPONSE_FLAG_BUFFER_DATA;
520                 }
521
522                 m_OutputStream.InternalHttpContext.EnsureBoundHandle();
523                 uint bytesSent;
524                 statusCode =
525                     UnsafeNclNativeMethods.HttpApi.HttpSendResponseEntityBody2(
526                         m_OutputStream.InternalHttpContext.RequestQueueHandle,
527                         m_OutputStream.InternalHttpContext.RequestId,
528                         (uint)flags,
529                         eventArgs.EntityChunkCount,
530                         eventArgs.EntityChunks,
531                         out bytesSent,
532                         SafeLocalFree.Zero,
533                         0,
534                         eventArgs.NativeOverlapped,
535                         IntPtr.Zero);
536
537                 if (statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
538                     statusCode != UnsafeNclNativeMethods.ErrorCodes.ERROR_IO_PENDING)
539                 {
540                     throw new HttpListenerException((int)statusCode);
541                 }
542                 else if (statusCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS &&
543                     HttpListener.SkipIOCPCallbackOnSuccess)
544                 {
545                     // IO operation completed synchronously - callback won't be called to signal completion.
546                     eventArgs.FinishOperationSuccess((int)bytesSent, true);
547                     completedAsynchronously = false;
548                 }
549                 else
550                 {
551                     completedAsynchronously = true;
552                 }
553             }
554             catch (Exception e)
555             {
556                 m_WriteEventArgs.FinishOperationFailure(e, true);
557                 m_OutputStream.SetClosedFlag();
558                 m_OutputStream.InternalHttpContext.Abort();
559
560                 throw;
561             }
562             finally
563             {
564                 if (WebSocketBase.LoggingEnabled)
565                 {
566                     Logging.Exit(Logging.WebSockets, this, Methods.WriteAsyncFast, completedAsynchronously);
567                 }
568             }
569
570             return completedAsynchronously;
571         }
572
573         public override void WriteByte(byte value)
574         {
575             m_OutputStream.WriteByte(value);
576         }
577
578         public override IAsyncResult BeginWrite(byte[] buffer,
579             int offset,
580             int count,
581             AsyncCallback callback,
582             object state)
583         {
584             return m_OutputStream.BeginWrite(buffer, offset, count, callback, state);
585         }
586
587         public override void EndWrite(IAsyncResult asyncResult)
588         {
589             m_OutputStream.EndWrite(asyncResult);
590         }
591
592         public override void Flush()
593         {
594             m_OutputStream.Flush();
595         }
596
597         public override Task FlushAsync(CancellationToken cancellationToken)
598         {
599             return m_OutputStream.FlushAsync(cancellationToken);
600         }
601
602         public override long Seek(long offset, SeekOrigin origin)
603         {
604             throw new NotSupportedException(SR.GetString(SR.net_noseek));
605         }
606
607         public override void SetLength(long value)
608         {
609             throw new NotSupportedException(SR.GetString(SR.net_noseek));
610         }
611
612         public async Task CloseNetworkConnectionAsync(CancellationToken cancellationToken)
613         {
614             // need to yield here to make sure that we don't get any exception synchronously
615             await Task.Yield();
616
617             if (WebSocketBase.LoggingEnabled)
618             {
619                 Logging.Enter(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
620             }
621
622             CancellationTokenRegistration cancellationTokenRegistration = new CancellationTokenRegistration();
623
624             try
625             {
626                 if (cancellationToken.CanBeCanceled)
627                 {
628                     cancellationTokenRegistration = cancellationToken.Register(s_OnCancel, this, false);
629                 }
630 #if DEBUG
631                 // When using fast path only one outstanding read is permitted. By switching into opaque mode
632                 // via IWebSocketStream.SwitchToOpaqueMode (see more detailed comments in interface definition)
633                 // caller takes responsibility for enforcing this constraint.
634                 Contract.Assert(Interlocked.Increment(ref m_OutstandingOperations.m_Writes) == 1,
635                     "Only one outstanding write allowed at any given time.");
636 #endif
637                 m_WriteTaskCompletionSource = new TaskCompletionSource<object>();
638                 m_WriteEventArgs.SetShouldCloseOutput();
639                 if (WriteAsyncFast(m_WriteEventArgs))
640                 {
641                     await m_WriteTaskCompletionSource.Task.SuppressContextFlow();
642                 }
643             }
644             catch (Exception error)
645             {
646                 if (!s_CanHandleException(error))
647                 {
648                     throw;
649                 }
650
651                 // throw OperationCancelledException when canceled by the caller
652                 // otherwise swallow the exception
653                 cancellationToken.ThrowIfCancellationRequested();
654             }
655             finally
656             {
657                 cancellationTokenRegistration.Dispose();
658
659                 if (WebSocketBase.LoggingEnabled)
660                 {
661                     Logging.Exit(Logging.WebSockets, this, Methods.CloseNetworkConnectionAsync, string.Empty);
662                 }
663             }
664         }
665
666         protected override void Dispose(bool disposing)
667         {
668             if (disposing && Interlocked.Exchange(ref m_CleanedUp, 1) == 0)
669             {
670                 if (m_ReadTaskCompletionSource != null)
671                 {
672                     m_ReadTaskCompletionSource.TrySetCanceled();
673                 }
674
675                 if (m_WriteTaskCompletionSource != null)
676                 {
677                     m_WriteTaskCompletionSource.TrySetCanceled();
678                 }
679
680                 if (m_ReadEventArgs != null)
681                 {
682                     m_ReadEventArgs.Dispose();
683                 }
684
685                 if (m_WriteEventArgs != null)
686                 {
687                     m_WriteEventArgs.Dispose();
688                 }
689
690                 try
691                 {
692                     m_InputStream.Close();
693                 }
694                 finally
695                 {
696                     m_OutputStream.Close();
697                 }
698             }
699         }
700
701         public void Abort()
702         {
703             OnCancel(this);
704         }
705
706         private static bool CanHandleException(Exception error)
707         {
708             return error is HttpListenerException ||
709                 error is ObjectDisposedException ||
710                 error is IOException;
711         }
712
713         private static void OnCancel(object state)
714         {
715             Contract.Assert(state != null, "'state' MUST NOT be NULL.");
716             WebSocketHttpListenerDuplexStream thisPtr = state as WebSocketHttpListenerDuplexStream;
717             Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
718
719             if (WebSocketBase.LoggingEnabled)
720             {
721                 Logging.Enter(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
722             }
723
724             try
725             {
726                 thisPtr.m_OutputStream.SetClosedFlag();
727                 thisPtr.m_Context.Abort();
728             }
729             catch { }
730
731             TaskCompletionSource<int> readTaskCompletionSourceSnapshot = thisPtr.m_ReadTaskCompletionSource;
732
733             if (readTaskCompletionSourceSnapshot != null)
734             {
735                 readTaskCompletionSourceSnapshot.TrySetCanceled();
736             }
737
738             TaskCompletionSource<object> writeTaskCompletionSourceSnapshot = thisPtr.m_WriteTaskCompletionSource;
739
740             if (writeTaskCompletionSourceSnapshot != null)
741             {
742                 writeTaskCompletionSourceSnapshot.TrySetCanceled();
743             }
744
745             if (WebSocketBase.LoggingEnabled)
746             {
747                 Logging.Exit(Logging.WebSockets, state, Methods.OnCancel, string.Empty);
748             }
749         }
750
751         public void SwitchToOpaqueMode(WebSocketBase webSocket)
752         {
753             Contract.Assert(webSocket != null, "'webSocket' MUST NOT be NULL.");
754             Contract.Assert(m_OutputStream != null, "'m_OutputStream' MUST NOT be NULL.");
755             Contract.Assert(m_OutputStream.InternalHttpContext != null,
756                 "'m_OutputStream.InternalHttpContext' MUST NOT be NULL.");
757             Contract.Assert(m_OutputStream.InternalHttpContext.Response != null,
758                 "'m_OutputStream.InternalHttpContext.Response' MUST NOT be NULL.");
759             Contract.Assert(m_OutputStream.InternalHttpContext.Response.SentHeaders,
760                 "Headers MUST have been sent at this point.");
761             Contract.Assert(!m_InOpaqueMode, "SwitchToOpaqueMode MUST NOT be called multiple times.");
762
763             if (m_InOpaqueMode)
764             {
765                 throw new InvalidOperationException();
766             }
767
768             m_WebSocket = webSocket;
769             m_InOpaqueMode = true;
770             m_ReadEventArgs = new HttpListenerAsyncEventArgs(webSocket, this);
771             m_ReadEventArgs.Completed += s_OnReadCompleted;
772             m_WriteEventArgs = new HttpListenerAsyncEventArgs(webSocket, this);
773             m_WriteEventArgs.Completed += s_OnWriteCompleted;
774
775             if (WebSocketBase.LoggingEnabled)
776             {
777                 Logging.Associate(Logging.WebSockets, this, webSocket);
778             }
779         }
780
781         private static void OnWriteCompleted(object sender, HttpListenerAsyncEventArgs eventArgs)
782         {
783             Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
784             WebSocketHttpListenerDuplexStream thisPtr = eventArgs.CurrentStream;
785             Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
786 #if DEBUG
787             Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Writes) >= 0,
788                 "'thisPtr.m_OutstandingOperations.m_Writes' MUST NOT be negative.");
789 #endif
790
791             if (WebSocketBase.LoggingEnabled)
792             {
793                 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
794             }
795
796             if (eventArgs.Exception != null)
797             {
798                 thisPtr.m_WriteTaskCompletionSource.TrySetException(eventArgs.Exception);
799             }
800             else
801             {
802                 thisPtr.m_WriteTaskCompletionSource.TrySetResult(null);
803             }
804
805             if (WebSocketBase.LoggingEnabled)
806             {
807                 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnWriteCompleted, string.Empty);
808             }
809         }
810
811         private static void OnReadCompleted(object sender, HttpListenerAsyncEventArgs eventArgs)
812         {
813             Contract.Assert(eventArgs != null, "'eventArgs' MUST NOT be NULL.");
814             WebSocketHttpListenerDuplexStream thisPtr = eventArgs.CurrentStream;
815             Contract.Assert(thisPtr != null, "'thisPtr' MUST NOT be NULL.");
816 #if DEBUG
817             Contract.Assert(Interlocked.Decrement(ref thisPtr.m_OutstandingOperations.m_Reads) >= 0,
818                 "'thisPtr.m_OutstandingOperations.m_Reads' MUST NOT be negative.");
819 #endif
820
821             if (WebSocketBase.LoggingEnabled)
822             {
823                 Logging.Enter(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
824             }
825
826             if (eventArgs.Exception != null)
827             {
828                 thisPtr.m_ReadTaskCompletionSource.TrySetException(eventArgs.Exception);
829             }
830             else
831             {
832                 thisPtr.m_ReadTaskCompletionSource.TrySetResult(eventArgs.BytesTransferred);
833             }
834
835             if (WebSocketBase.LoggingEnabled)
836             {
837                 Logging.Exit(Logging.WebSockets, thisPtr, Methods.OnReadCompleted, string.Empty);
838             }
839         }
840
841         internal class HttpListenerAsyncEventArgs : EventArgs, IDisposable
842         {
843             private const int Free = 0;
844             private const int InProgress = 1;
845             private const int Disposed = 2;
846             private int m_Operating;
847
848             private bool m_DisposeCalled;
849             private SafeNativeOverlapped m_PtrNativeOverlapped;
850             private Overlapped m_Overlapped;
851             private event EventHandler<HttpListenerAsyncEventArgs> m_Completed;
852             private byte[] m_Buffer;
853             private IList<ArraySegment<byte>> m_BufferList;
854             private int m_Count;
855             private int m_Offset;
856             private int m_BytesTransferred;
857             private HttpListenerAsyncOperation m_CompletedOperation;
858             private UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK[] m_DataChunks;
859             private GCHandle m_DataChunksGCHandle;
860             private ushort m_DataChunkCount;
861             private Exception m_Exception;
862             private bool m_ShouldCloseOutput;
863             private readonly WebSocketBase m_WebSocket;
864             private readonly WebSocketHttpListenerDuplexStream m_CurrentStream;
865
866             public HttpListenerAsyncEventArgs(WebSocketBase webSocket, WebSocketHttpListenerDuplexStream stream)
867                 : base()
868             {
869                 m_WebSocket = webSocket;
870                 m_CurrentStream = stream;
871                 InitializeOverlapped();
872             }
873
874             public int BytesTransferred
875             {
876                 get { return m_BytesTransferred; }
877             }
878
879             public byte[] Buffer
880             {
881                 get { return m_Buffer; }
882             }
883
884             // BufferList property.
885             // Mutually exclusive with Buffer.
886             // Setting this property with an existing non-null Buffer will cause an assert.    
887             public IList<ArraySegment<byte>> BufferList
888             {
889                 get { return m_BufferList; }
890                 set
891                 {
892                     Contract.Assert(!m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'false' at this point.");
893                     Contract.Assert(value == null || m_Buffer == null, 
894                         "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
895                     Contract.Assert(m_Operating == Free, 
896                         "This property can only be modified if no IO operation is outstanding.");
897                     Contract.Assert(value == null || value.Count == 2, 
898                         "This list can only be 'NULL' or MUST have exactly '2' items.");
899                     m_BufferList = value;
900                 }
901             }
902
903             public bool ShouldCloseOutput
904             {
905                 get { return m_ShouldCloseOutput; }
906             }
907
908             public int Offset
909             {
910                 get { return m_Offset; }
911             }
912
913             public int Count
914             {
915                 get { return m_Count; }
916             }
917
918             public Exception Exception
919             {
920                 get { return m_Exception; }
921             }
922
923             public ushort EntityChunkCount
924             {
925                 get
926                 {
927                     if (m_DataChunks == null)
928                     {
929                         return 0;
930                     }
931
932                     return m_DataChunkCount;
933                 }
934             }
935
936             public SafeNativeOverlapped NativeOverlapped
937             {
938                 get { return m_PtrNativeOverlapped; }
939             }
940
941             public IntPtr EntityChunks
942             {
943                 get
944                 {
945                     if (m_DataChunks == null)
946                     {
947                         return IntPtr.Zero;
948                     }
949
950                     return Marshal.UnsafeAddrOfPinnedArrayElement(m_DataChunks, 0);
951                 }
952             }
953
954             public WebSocketHttpListenerDuplexStream CurrentStream
955             {
956                 get { return m_CurrentStream; }
957             }
958
959             public event EventHandler<HttpListenerAsyncEventArgs> Completed
960             {
961                 add
962                 {
963                     m_Completed += value;
964                 }
965                 remove
966                 {
967                     m_Completed -= value;
968                 }
969             }
970
971             protected virtual void OnCompleted(HttpListenerAsyncEventArgs e)
972             {
973                 EventHandler<HttpListenerAsyncEventArgs> handler = m_Completed;
974                 if (handler != null)
975                 {
976                     handler(e.m_CurrentStream, e);
977                 }
978             }
979
980             public void SetShouldCloseOutput()
981             {
982                 m_BufferList = null;
983                 m_Buffer = null;
984                 m_ShouldCloseOutput = true;
985             }
986
987             public void Dispose()
988             {
989                 // Remember that Dispose was called.
990                 m_DisposeCalled = true;
991
992                 // Check if this object is in-use for an async socket operation.
993                 if (Interlocked.CompareExchange(ref m_Operating, Disposed, Free) != Free)
994                 {
995                     // Either already disposed or will be disposed when current operation completes.
996                     return;
997                 }
998
999                 // OK to dispose now.
1000                 // Free native overlapped data.
1001                 FreeOverlapped(false);
1002
1003                 // Don't bother finalizing later.
1004                 GC.SuppressFinalize(this);
1005             }
1006
1007             // Finalizer
1008             ~HttpListenerAsyncEventArgs()
1009             {
1010                 FreeOverlapped(true);
1011             }
1012
1013             private unsafe void InitializeOverlapped()
1014             {
1015                 m_Overlapped = new Overlapped();
1016                 m_PtrNativeOverlapped = new SafeNativeOverlapped(m_Overlapped.UnsafePack(CompletionPortCallback, null));
1017             }
1018
1019             // Method to clean up any existing Overlapped object and related state variables.
1020             private void FreeOverlapped(bool checkForShutdown)
1021             {
1022                 if (!checkForShutdown || !NclUtilities.HasShutdownStarted)
1023                 {
1024                     // Free the overlapped object
1025                     if (m_PtrNativeOverlapped != null && !m_PtrNativeOverlapped.IsInvalid)
1026                     {
1027                         m_PtrNativeOverlapped.Dispose();
1028                     }
1029
1030                     if (m_DataChunksGCHandle.IsAllocated)
1031                     {
1032                         m_DataChunksGCHandle.Free();
1033                     }
1034                 }
1035             }
1036
1037             // Method called to prepare for a native async http.sys call.
1038             // This method performs the tasks common to all http.sys operations.
1039             internal void StartOperationCommon(WebSocketHttpListenerDuplexStream currentStream)
1040             {
1041                 // Change status to "in-use".
1042                 if(Interlocked.CompareExchange(ref m_Operating, InProgress, Free) != Free)
1043                 {
1044                     // If it was already "in-use" check if Dispose was called.
1045                     if (m_DisposeCalled)
1046                     {
1047                         // Dispose was called - throw ObjectDisposed.
1048                         throw new ObjectDisposedException(GetType().FullName);
1049                     }
1050
1051                     Contract.Assert(false, "Only one outstanding async operation is allowed per HttpListenerAsyncEventArgs instance.");
1052                     // Only one at a time.
1053                     throw new InvalidOperationException();
1054                 }
1055
1056                 // HttpSendResponseEntityBody can return ERROR_INVALID_PARAMETER if the InternalHigh field of the overlapped
1057                 // is not IntPtr.Zero, so we have to reset this field because we are reusing the Overlapped.
1058                 // When using the IAsyncResult based approach of HttpListenerResponseStream the Overlapped is reinitialized
1059                 // for each operation by the CLR when returned from the OverlappedDataCache.
1060                 NativeOverlapped.ReinitializeNativeOverlapped();
1061                 m_Exception = null;
1062                 m_BytesTransferred = 0;
1063             }
1064
1065             internal void StartOperationReceive()
1066             {
1067                 // Remember the operation type.
1068                 m_CompletedOperation = HttpListenerAsyncOperation.Receive;
1069             }
1070
1071             internal void StartOperationSend()
1072             {
1073                 UpdateDataChunk();
1074
1075                 // Remember the operation type.
1076                 m_CompletedOperation = HttpListenerAsyncOperation.Send;
1077             }
1078
1079             public void SetBuffer(byte[] buffer, int offset, int count)
1080             {
1081                 Contract.Assert(!m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'false' at this point.");
1082                 Contract.Assert(buffer == null || m_BufferList == null, "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
1083                 m_Buffer = buffer;
1084                 m_Offset = offset;
1085                 m_Count = count;
1086             }
1087
1088             private unsafe void UpdateDataChunk()
1089             {
1090                 if (m_DataChunks == null)
1091                 {
1092                     m_DataChunks = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK[2];
1093                     m_DataChunksGCHandle = GCHandle.Alloc(m_DataChunks, GCHandleType.Pinned);
1094                     m_DataChunks[0] = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK();
1095                     m_DataChunks[0].DataChunkType = UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
1096                     m_DataChunks[1] = new UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK();
1097                     m_DataChunks[1].DataChunkType = UnsafeNclNativeMethods.HttpApi.HTTP_DATA_CHUNK_TYPE.HttpDataChunkFromMemory;
1098                 }
1099
1100                 Contract.Assert(m_Buffer == null || m_BufferList == null, "Either 'm_Buffer' or 'm_BufferList' MUST be NULL.");
1101                 Contract.Assert(m_ShouldCloseOutput || m_Buffer != null || m_BufferList != null, "Either 'm_Buffer' or 'm_BufferList' MUST NOT be NULL.");
1102                 
1103                 // The underlying byte[] m_Buffer or each m_BufferList[].Array are pinned already 
1104                 if (m_Buffer != null)
1105                 {
1106                     UpdateDataChunk(0, m_Buffer, m_Offset, m_Count);
1107                     UpdateDataChunk(1, null, 0, 0);
1108                     m_DataChunkCount = 1;
1109                 }
1110                 else if (m_BufferList != null)
1111                 {
1112                     Contract.Assert(m_BufferList != null && m_BufferList.Count == 2,
1113                         "'m_BufferList' MUST NOT be NULL and have exactly '2' items at this point.");
1114                     UpdateDataChunk(0, m_BufferList[0].Array, m_BufferList[0].Offset, m_BufferList[0].Count);
1115                     UpdateDataChunk(1, m_BufferList[1].Array, m_BufferList[1].Offset, m_BufferList[1].Count);
1116                     m_DataChunkCount = 2;
1117                 }
1118                 else
1119                 {
1120                     Contract.Assert(m_ShouldCloseOutput, "'m_ShouldCloseOutput' MUST be 'true' at this point.");
1121                     m_DataChunks = null;
1122                 }
1123             }
1124
1125             private unsafe void UpdateDataChunk(int index, byte[] buffer, int offset, int count)
1126             {
1127                 if (buffer == null)
1128                 {
1129                     m_DataChunks[index].pBuffer = null;
1130                     m_DataChunks[index].BufferLength = 0;
1131                     return;
1132                 }
1133
1134                 if (m_WebSocket.InternalBuffer.IsInternalBuffer(buffer, offset, count))
1135                 {
1136                     m_DataChunks[index].pBuffer = (byte*)(m_WebSocket.InternalBuffer.ToIntPtr(offset));
1137                 }
1138                 else
1139                 {
1140                     m_DataChunks[index].pBuffer = 
1141                         (byte*)m_WebSocket.InternalBuffer.ConvertPinnedSendPayloadToNative(buffer, offset, count);
1142                 }
1143
1144                 m_DataChunks[index].BufferLength = (uint)count;
1145             }
1146
1147             // Method to mark this object as no longer "in-use".
1148             // Will also execute a Dispose deferred because I/O was in progress.  
1149             internal void Complete()
1150             {
1151                 // Mark as not in-use            
1152                 m_Operating = Free;
1153
1154                 // Check for deferred Dispose().
1155                 // The deferred Dispose is not guaranteed if Dispose is called while an operation is in progress. 
1156                 // The m_DisposeCalled variable is not managed in a thread-safe manner on purpose for performance.
1157                 if (m_DisposeCalled)
1158                 {
1159                     Dispose();
1160                 }
1161             }
1162
1163             // Method to update internal state after sync or async completion.
1164             private void SetResults(Exception exception, int bytesTransferred)
1165             {
1166                 m_Exception = exception;
1167                 m_BytesTransferred = bytesTransferred;
1168             }
1169
1170             internal void FinishOperationFailure(Exception exception, bool syncCompletion)
1171             {
1172                 SetResults(exception, 0);
1173
1174                 if (WebSocketBase.LoggingEnabled)
1175                 {
1176                     Logging.PrintError(Logging.WebSockets, m_CurrentStream, 
1177                         m_CompletedOperation == HttpListenerAsyncOperation.Receive ? Methods.ReadAsyncFast : Methods.WriteAsyncFast,
1178                         exception.ToString());
1179                 }
1180
1181                 Complete();
1182                 OnCompleted(this);
1183             }
1184
1185             internal void FinishOperationSuccess(int bytesTransferred, bool syncCompletion)
1186             {
1187                 SetResults(null, bytesTransferred);
1188
1189                 if (WebSocketBase.LoggingEnabled)
1190                 {
1191                     if (m_Buffer != null)
1192                     {
1193                         Logging.Dump(Logging.WebSockets, m_CurrentStream,
1194                             m_CompletedOperation == HttpListenerAsyncOperation.Receive ? Methods.ReadAsyncFast : Methods.WriteAsyncFast,
1195                             m_Buffer, m_Offset, bytesTransferred);
1196                     }
1197                     else if (m_BufferList != null)
1198                     {
1199                         Contract.Assert(m_CompletedOperation == HttpListenerAsyncOperation.Send,
1200                             "'BufferList' is only supported for send operations.");
1201
1202                         foreach (ArraySegment<byte> buffer in BufferList)
1203                         {
1204                             Logging.Dump(Logging.WebSockets, this, Methods.WriteAsyncFast, buffer.Array, buffer.Offset, buffer.Count);
1205                         }
1206                     }
1207                     else
1208                     {
1209                         Logging.PrintLine(Logging.WebSockets, TraceEventType.Verbose, 0,
1210                             string.Format(CultureInfo.InvariantCulture, "Output channel closed for {0}#{1}",
1211                             m_CurrentStream.GetType().Name, ValidationHelper.HashString(m_CurrentStream)));
1212                     }
1213                 }
1214
1215                 if (m_ShouldCloseOutput)
1216                 {
1217                     m_CurrentStream.m_OutputStream.SetClosedFlag();
1218                 }
1219                 
1220                 // Complete the operation and raise completion event.
1221                 Complete();
1222                 OnCompleted(this);
1223             }
1224
1225             private unsafe void CompletionPortCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeOverlapped)
1226             {
1227                 if (errorCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_SUCCESS ||
1228                     errorCode == UnsafeNclNativeMethods.ErrorCodes.ERROR_HANDLE_EOF)
1229                 {
1230                     FinishOperationSuccess((int)numBytes, false);
1231                 }
1232                 else
1233                 {
1234                     FinishOperationFailure(new HttpListenerException((int)errorCode), false);
1235                 }
1236             }
1237
1238             public enum HttpListenerAsyncOperation
1239             {
1240                 None,
1241                 Receive,
1242                 Send
1243             }
1244         }
1245
1246         private static class Methods
1247         {
1248             public const string CloseNetworkConnectionAsync = "CloseNetworkConnectionAsync";
1249             public const string OnCancel = "OnCancel";
1250             public const string OnReadCompleted = "OnReadCompleted";
1251             public const string OnWriteCompleted = "OnWriteCompleted";
1252             public const string ReadAsyncFast = "ReadAsyncFast";
1253             public const string ReadAsyncCore = "ReadAsyncCore";
1254             public const string WriteAsyncFast = "WriteAsyncFast";
1255             public const string WriteAsyncCore = "WriteAsyncCore";
1256             public const string MultipleWriteAsyncCore = "MultipleWriteAsyncCore";
1257         }
1258     }
1259 }