Update Reference Sources to .NET Framework 4.6.1
[mono.git] / mcs / class / referencesource / System.ServiceModel / System / ServiceModel / Channels / ConnectionDemuxer.cs
1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //------------------------------------------------------------
4
5 namespace System.ServiceModel.Channels
6 {
7     using System.Collections.Generic;
8     using System.Diagnostics;
9     using System.Globalization;
10     using System.IO;
11     using System.Runtime;
12     using System.Runtime.Diagnostics;
13     using System.ServiceModel;
14     using System.ServiceModel.Activation;
15     using System.ServiceModel.Diagnostics;
16     using System.ServiceModel.Diagnostics.Application;
17     using System.ServiceModel.Dispatcher;
18     using System.Threading;
19
20     sealed class ConnectionDemuxer : IDisposable
21     {
22         static AsyncCallback onSingletonPreambleComplete;
23         ConnectionAcceptor acceptor;
24
25         // we use this list to track readers that don't have a clear owner (so they don't get GC'ed)
26         List<InitialServerConnectionReader> connectionReaders;
27
28         bool isDisposed;
29         ConnectionModeCallback onConnectionModeKnown;
30         ConnectionModeCallback onCachedConnectionModeKnown;
31         ConnectionClosedCallback onConnectionClosed;
32         ServerSessionPreambleCallback onSessionPreambleKnown;
33         ServerSingletonPreambleCallback onSingletonPreambleKnown;
34         Action<object> reuseConnectionCallback;
35         ServerSessionPreambleDemuxCallback serverSessionPreambleCallback;
36         SingletonPreambleDemuxCallback singletonPreambleCallback;
37         TransportSettingsCallback transportSettingsCallback;
38         Action pooledConnectionDequeuedCallback;
39         Action<Uri> viaDelegate;
40         TimeSpan channelInitializationTimeout;
41         TimeSpan idleTimeout;
42         int maxPooledConnections;
43         int pooledConnectionCount;
44
45         public ConnectionDemuxer(IConnectionListener listener, int maxAccepts, int maxPendingConnections,
46             TimeSpan channelInitializationTimeout, TimeSpan idleTimeout, int maxPooledConnections,
47             TransportSettingsCallback transportSettingsCallback,
48             SingletonPreambleDemuxCallback singletonPreambleCallback,
49             ServerSessionPreambleDemuxCallback serverSessionPreambleCallback, ErrorCallback errorCallback)
50         {
51             this.connectionReaders = new List<InitialServerConnectionReader>();
52             this.acceptor =
53                 new ConnectionAcceptor(listener, maxAccepts, maxPendingConnections, OnConnectionAvailable, errorCallback);
54             this.channelInitializationTimeout = channelInitializationTimeout;
55             this.idleTimeout = idleTimeout;
56             this.maxPooledConnections = maxPooledConnections;
57             this.onConnectionClosed = new ConnectionClosedCallback(OnConnectionClosed);
58             this.transportSettingsCallback = transportSettingsCallback;
59             this.singletonPreambleCallback = singletonPreambleCallback;
60             this.serverSessionPreambleCallback = serverSessionPreambleCallback;
61         }
62
63         object ThisLock
64         {
65             get { return this; }
66         }
67
68         public void Dispose()
69         {
70             lock (ThisLock)
71             {
72                 if (isDisposed)
73                     return;
74
75                 isDisposed = true;
76             }
77
78             for (int i = 0; i < connectionReaders.Count; i++)
79             {
80                 connectionReaders[i].Dispose();
81             }
82             connectionReaders.Clear();
83
84             acceptor.Dispose();
85         }
86
87         ConnectionModeReader SetupModeReader(IConnection connection, bool isCached)
88         {
89             ConnectionModeReader modeReader;
90             if (isCached)
91             {
92                 if (onCachedConnectionModeKnown == null)
93                 {
94                     onCachedConnectionModeKnown = new ConnectionModeCallback(OnCachedConnectionModeKnown);
95                 }
96
97                 modeReader = new ConnectionModeReader(connection, onCachedConnectionModeKnown, onConnectionClosed);
98             }
99             else
100             {
101                 if (onConnectionModeKnown == null)
102                 {
103                     onConnectionModeKnown = new ConnectionModeCallback(OnConnectionModeKnown);
104                 }
105
106                 modeReader = new ConnectionModeReader(connection, onConnectionModeKnown, onConnectionClosed);
107             }
108
109             lock (ThisLock)
110             {
111                 if (isDisposed)
112                 {
113                     modeReader.Dispose();
114                     return null;
115                 }
116
117                 connectionReaders.Add(modeReader);
118                 return modeReader;
119             }
120         }
121
122         public void ReuseConnection(IConnection connection, TimeSpan closeTimeout)
123         {
124             connection.ExceptionEventType = TraceEventType.Information;
125             ConnectionModeReader modeReader = SetupModeReader(connection, true);
126
127             if (modeReader != null)
128             {
129                 if (reuseConnectionCallback == null)
130                 {
131                     reuseConnectionCallback = new Action<object>(ReuseConnectionCallback);
132                 }
133
134                 ActionItem.Schedule(reuseConnectionCallback, new ReuseConnectionState(modeReader, closeTimeout));
135             }
136         }
137
138         void ReuseConnectionCallback(object state)
139         {
140             ReuseConnectionState connectionState = (ReuseConnectionState)state;
141             bool closeReader = false;
142             lock (ThisLock)
143             {
144                 if (this.pooledConnectionCount >= this.maxPooledConnections)
145                 {
146                     closeReader = true;
147                 }
148                 else
149                 {
150                     this.pooledConnectionCount++;
151                 }
152             }
153
154             if (closeReader)
155             {
156                 if (DiagnosticUtility.ShouldTraceWarning)
157                 {
158                     TraceUtility.TraceEvent(TraceEventType.Warning,
159                         TraceCode.ServerMaxPooledConnectionsQuotaReached,
160                         SR.GetString(SR.TraceCodeServerMaxPooledConnectionsQuotaReached, maxPooledConnections),
161                         new StringTraceRecord("MaxOutboundConnectionsPerEndpoint", maxPooledConnections.ToString(CultureInfo.InvariantCulture)),
162                         this, null);
163                 }
164
165                 if (TD.ServerMaxPooledConnectionsQuotaReachedIsEnabled())
166                 {
167                     TD.ServerMaxPooledConnectionsQuotaReached();
168                 }
169
170                 connectionState.ModeReader.CloseFromPool(connectionState.CloseTimeout);
171             }
172             else
173             {
174                 if (this.pooledConnectionDequeuedCallback == null)
175                 {
176                     this.pooledConnectionDequeuedCallback = new Action(PooledConnectionDequeuedCallback);
177                 }
178                 connectionState.ModeReader.StartReading(this.idleTimeout, this.pooledConnectionDequeuedCallback);
179             }
180         }
181
182         void PooledConnectionDequeuedCallback()
183         {
184             lock (ThisLock)
185             {
186                 this.pooledConnectionCount--;
187                 Fx.Assert(this.pooledConnectionCount >= 0, "Connection Throttle should never be negative");
188             }
189         }
190
191         void OnConnectionAvailable(IConnection connection, Action connectionDequeuedCallback)
192         {
193             ConnectionModeReader modeReader = SetupModeReader(connection, false);
194
195             if (modeReader != null)
196             {
197                 // StartReading() will never throw non-fatal exceptions; 
198                 // it propagates all exceptions into the onConnectionModeKnown callback, 
199                 // which is where we need our robust handling
200                 modeReader.StartReading(this.channelInitializationTimeout, connectionDequeuedCallback);
201             }
202             else
203             {
204                 connectionDequeuedCallback();
205             }
206         }
207
208         void OnCachedConnectionModeKnown(ConnectionModeReader modeReader)
209         {
210             OnConnectionModeKnownCore(modeReader, true);
211         }
212
213         void OnConnectionModeKnown(ConnectionModeReader modeReader)
214         {
215             OnConnectionModeKnownCore(modeReader, false);
216         }
217
218         void OnConnectionModeKnownCore(ConnectionModeReader modeReader, bool isCached)
219         {
220             lock (ThisLock)
221             {
222                 if (isDisposed)
223                     return;
224
225                 this.connectionReaders.Remove(modeReader);
226             }
227
228             bool closeReader = true;
229             try
230             {
231                 FramingMode framingMode;
232                 try
233                 {
234                     framingMode = modeReader.GetConnectionMode();
235                 }
236                 catch (CommunicationException exception)
237                 {
238                     TraceEventType eventType = modeReader.Connection.ExceptionEventType;
239                     DiagnosticUtility.TraceHandledException(exception, eventType);
240                     return;
241                 }
242                 catch (TimeoutException exception)
243                 {
244                     if (!isCached)
245                     {
246                         exception = new TimeoutException(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout), exception);
247                         System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception);
248                     }
249
250                     if (TD.ChannelInitializationTimeoutIsEnabled())
251                     {
252                         TD.ChannelInitializationTimeout(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout));
253                     }
254
255                     TraceEventType eventType = modeReader.Connection.ExceptionEventType;
256                     DiagnosticUtility.TraceHandledException(exception, eventType);
257                     return;
258                 }
259
260                 switch (framingMode)
261                 {
262                     case FramingMode.Duplex:
263                         OnDuplexConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback,
264                             modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize,
265                             modeReader.GetRemainingTimeout());
266                         break;
267
268                     case FramingMode.Singleton:
269                         OnSingletonConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback,
270                             modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize,
271                             modeReader.GetRemainingTimeout());
272                         break;
273
274                     default:
275                         {
276                             Exception inner = new InvalidDataException(SR.GetString(
277                                 SR.FramingModeNotSupported, framingMode));
278                             Exception exception = new ProtocolException(inner.Message, inner);
279                             FramingEncodingString.AddFaultString(exception, FramingEncodingString.UnsupportedModeFault);
280                             System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception);
281                             return;
282                         }
283                 }
284
285                 closeReader = false;
286             }
287             catch (Exception e)
288             {
289                 if (Fx.IsFatal(e))
290                 {
291                     throw;
292                 }
293                 if (!ExceptionHandler.HandleTransportExceptionHelper(e))
294                 {
295                     throw;
296                 }
297
298                 // containment -- the reader is aborted, no need for additional containment
299             }
300             finally
301             {
302                 if (closeReader)
303                 {
304                     modeReader.Dispose();
305                 }
306             }
307         }
308
309         void OnConnectionClosed(InitialServerConnectionReader connectionReader)
310         {
311             lock (ThisLock)
312             {
313                 if (isDisposed)
314                     return;
315
316                 connectionReaders.Remove(connectionReader);
317             }
318         }
319
320         void OnSingletonConnection(IConnection connection, Action connectionDequeuedCallback,
321             long streamPosition, int offset, int size, TimeSpan timeout)
322         {
323             if (onSingletonPreambleKnown == null)
324             {
325                 onSingletonPreambleKnown = OnSingletonPreambleKnown;
326             }
327             ServerSingletonPreambleConnectionReader singletonPreambleReader =
328                 new ServerSingletonPreambleConnectionReader(connection, connectionDequeuedCallback, streamPosition, offset, size,
329                 transportSettingsCallback, onConnectionClosed, onSingletonPreambleKnown);
330
331             lock (ThisLock)
332             {
333                 if (isDisposed)
334                 {
335                     singletonPreambleReader.Dispose();
336                     return;
337                 }
338
339                 connectionReaders.Add(singletonPreambleReader);
340             }
341             singletonPreambleReader.StartReading(viaDelegate, timeout);
342         }
343
344         void OnSingletonPreambleKnown(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader)
345         {
346             lock (ThisLock)
347             {
348                 if (isDisposed)
349                 {
350                     return;
351                 }
352
353                 connectionReaders.Remove(serverSingletonPreambleReader);
354             }
355
356             if (onSingletonPreambleComplete == null)
357             {
358                 onSingletonPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnSingletonPreambleComplete));
359             }
360
361             ISingletonChannelListener singletonChannelListener = singletonPreambleCallback(serverSingletonPreambleReader);
362             Fx.Assert(singletonChannelListener != null,
363                 "singletonPreambleCallback must return a listener or send a Fault/throw");
364
365             // transfer ownership of the connection from the preamble reader to the message handler
366
367             IAsyncResult result = BeginCompleteSingletonPreamble(serverSingletonPreambleReader, singletonChannelListener, onSingletonPreambleComplete, this);
368
369             if (result.CompletedSynchronously)
370             {
371                 EndCompleteSingletonPreamble(result);
372             }
373         }
374
375         IAsyncResult BeginCompleteSingletonPreamble(
376             ServerSingletonPreambleConnectionReader serverSingletonPreambleReader, 
377             ISingletonChannelListener singletonChannelListener,
378             AsyncCallback callback, object state)
379         {
380             return new CompleteSingletonPreambleAndDispatchRequestAsyncResult(serverSingletonPreambleReader, singletonChannelListener, this, callback, state);
381         }
382
383         void EndCompleteSingletonPreamble(IAsyncResult result)
384         {
385             CompleteSingletonPreambleAndDispatchRequestAsyncResult.End(result);
386         }
387
388         static void OnSingletonPreambleComplete(IAsyncResult result)
389         {
390             if (result.CompletedSynchronously)
391             {
392                 return;
393             }
394
395             ConnectionDemuxer thisPtr = (ConnectionDemuxer)result.AsyncState;
396
397             try
398             {
399                 thisPtr.EndCompleteSingletonPreamble(result);
400             }
401             catch (Exception ex)
402             {
403                 if (Fx.IsFatal(ex))
404                 {
405                     throw;
406                 }
407
408                 //should never actually hit this code - the async result will handle all exceptions, trace them, then abort the reader
409                 DiagnosticUtility.TraceHandledException(ex, TraceEventType.Warning);
410             }
411         }
412       
413         void OnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader)
414         {
415             lock (ThisLock)
416             {
417                 if (isDisposed)
418                 {
419                     return;
420                 }
421
422                 connectionReaders.Remove(serverSessionPreambleReader);
423             }
424
425             TraceOnSessionPreambleKnown(serverSessionPreambleReader);
426
427             serverSessionPreambleCallback(serverSessionPreambleReader, this);
428         }
429
430         static void TraceOnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader)
431         {
432             if (TD.SessionPreambleUnderstoodIsEnabled())
433             {
434                 TD.SessionPreambleUnderstood((serverSessionPreambleReader.Via != null) ? serverSessionPreambleReader.Via.ToString() : String.Empty);
435             }
436         }
437
438         void OnDuplexConnection(IConnection connection, Action connectionDequeuedCallback,
439             long streamPosition, int offset, int size, TimeSpan timeout)
440         {
441             if (onSessionPreambleKnown == null)
442             {
443                 onSessionPreambleKnown = OnSessionPreambleKnown;
444             }
445             ServerSessionPreambleConnectionReader sessionPreambleReader = new ServerSessionPreambleConnectionReader(
446                 connection, connectionDequeuedCallback, streamPosition, offset, size,
447                 transportSettingsCallback, onConnectionClosed, onSessionPreambleKnown);
448             lock (ThisLock)
449             {
450                 if (isDisposed)
451                 {
452                     sessionPreambleReader.Dispose();
453                     return;
454                 }
455
456                 connectionReaders.Add(sessionPreambleReader);
457             }
458
459             sessionPreambleReader.StartReading(viaDelegate, timeout);
460         }
461
462         public void StartDemuxing()
463         {
464             StartDemuxing(null);
465         }
466
467         public void StartDemuxing(Action<Uri> viaDelegate)
468         {
469             this.viaDelegate = viaDelegate;
470             acceptor.StartAccepting();
471         }
472
473         class CompleteSingletonPreambleAndDispatchRequestAsyncResult : AsyncResult
474         {
475             ServerSingletonPreambleConnectionReader serverSingletonPreambleReader;
476             ISingletonChannelListener singletonChannelListener;
477             ConnectionDemuxer demuxer;
478             TimeoutHelper timeoutHelper;
479
480             static AsyncCallback onPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnPreambleComplete));
481
482             public CompleteSingletonPreambleAndDispatchRequestAsyncResult(
483                 ServerSingletonPreambleConnectionReader serverSingletonPreambleReader,
484                 ISingletonChannelListener singletonChannelListener,
485                 ConnectionDemuxer demuxer,
486                 AsyncCallback callback, object state)
487                 : base(callback, state)
488             {
489                 this.serverSingletonPreambleReader = serverSingletonPreambleReader;
490                 this.singletonChannelListener = singletonChannelListener;
491                 this.demuxer = demuxer;
492
493                 //if this throws, the calling code paths will abort the connection, so we only need to 
494                 //call AbortConnection if BeginCompletePramble completes asynchronously.
495                 if (BeginCompletePreamble())
496                 {
497                     Complete(true);
498                 }
499             }
500
501             public static void End(IAsyncResult result)
502             {
503                 AsyncResult.End<CompleteSingletonPreambleAndDispatchRequestAsyncResult>(result);
504             }
505
506             bool BeginCompletePreamble()
507             {
508                 this.timeoutHelper = new TimeoutHelper(this.singletonChannelListener.ReceiveTimeout);
509                 IAsyncResult result = this.serverSingletonPreambleReader.BeginCompletePreamble(this.timeoutHelper.RemainingTime(), onPreambleComplete, this);
510
511                 if (result.CompletedSynchronously)
512                 {
513                     return HandlePreambleComplete(result);
514                 }
515
516                 return false;
517             }
518
519             static void OnPreambleComplete(IAsyncResult result)
520             {
521                 if (result.CompletedSynchronously)
522                 {
523                     return;
524                 }
525
526                 CompleteSingletonPreambleAndDispatchRequestAsyncResult thisPtr = (CompleteSingletonPreambleAndDispatchRequestAsyncResult)result.AsyncState;
527                 bool completeSelf = false;
528
529                 try
530                 {
531                     completeSelf = thisPtr.HandlePreambleComplete(result);
532                 }
533                 catch (Exception ex)
534                 {
535                     if (Fx.IsFatal(ex))
536                     {
537                         throw;
538                     }
539
540                     //Don't complete this AsyncResult with this non-fatal exception.  The calling code can't really do anything with it,
541                     //so just trace it (inside of AbortConnection), then ---- it.
542                     completeSelf = true;
543                     thisPtr.AbortConnection(ex);
544                 }
545
546                 if (completeSelf)
547                 {
548                     thisPtr.Complete(false);
549                 }
550             }
551
552             bool HandlePreambleComplete(IAsyncResult result)
553             {
554                 IConnection upgradedConnection = this.serverSingletonPreambleReader.EndCompletePreamble(result);
555                 ServerSingletonConnectionReader singletonReader = new ServerSingletonConnectionReader(serverSingletonPreambleReader, upgradedConnection, this.demuxer);
556
557                 //singletonReader doesn't have async version of ReceiveRequest, so just call the [....] method for now.
558                 RequestContext requestContext = singletonReader.ReceiveRequest(this.timeoutHelper.RemainingTime());
559                 singletonChannelListener.ReceiveRequest(requestContext, serverSingletonPreambleReader.ConnectionDequeuedCallback, true);
560
561                 return true;
562             }
563
564             void AbortConnection(Exception exception)
565             {
566                 //this will trace the exception and abort the connection
567                 this.serverSingletonPreambleReader.Abort(exception);
568             }
569         }
570
571         class ReuseConnectionState
572         {
573             ConnectionModeReader modeReader;
574             TimeSpan closeTimeout;
575
576             public ReuseConnectionState(ConnectionModeReader modeReader, TimeSpan closeTimeout)
577             {
578                 this.modeReader = modeReader;
579                 this.closeTimeout = closeTimeout;
580             }
581
582             public ConnectionModeReader ModeReader
583             {
584                 get { return this.modeReader; }
585             }
586
587             public TimeSpan CloseTimeout
588             {
589                 get { return this.closeTimeout; }
590             }
591         }
592     }
593 }