1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System.ServiceModel.Channels
7 using System.Collections.Generic;
8 using System.Diagnostics;
9 using System.Globalization;
12 using System.Runtime.Diagnostics;
13 using System.ServiceModel;
14 using System.ServiceModel.Activation;
15 using System.ServiceModel.Diagnostics;
16 using System.ServiceModel.Diagnostics.Application;
17 using System.ServiceModel.Dispatcher;
18 using System.Threading;
20 sealed class ConnectionDemuxer : IDisposable
22 static AsyncCallback onSingletonPreambleComplete;
23 ConnectionAcceptor acceptor;
25 // we use this list to track readers that don't have a clear owner (so they don't get GC'ed)
26 List<InitialServerConnectionReader> connectionReaders;
29 ConnectionModeCallback onConnectionModeKnown;
30 ConnectionModeCallback onCachedConnectionModeKnown;
31 ConnectionClosedCallback onConnectionClosed;
32 ServerSessionPreambleCallback onSessionPreambleKnown;
33 ServerSingletonPreambleCallback onSingletonPreambleKnown;
34 Action<object> reuseConnectionCallback;
35 ServerSessionPreambleDemuxCallback serverSessionPreambleCallback;
36 SingletonPreambleDemuxCallback singletonPreambleCallback;
37 TransportSettingsCallback transportSettingsCallback;
38 Action pooledConnectionDequeuedCallback;
39 Action<Uri> viaDelegate;
40 TimeSpan channelInitializationTimeout;
42 int maxPooledConnections;
43 int pooledConnectionCount;
45 public ConnectionDemuxer(IConnectionListener listener, int maxAccepts, int maxPendingConnections,
46 TimeSpan channelInitializationTimeout, TimeSpan idleTimeout, int maxPooledConnections,
47 TransportSettingsCallback transportSettingsCallback,
48 SingletonPreambleDemuxCallback singletonPreambleCallback,
49 ServerSessionPreambleDemuxCallback serverSessionPreambleCallback, ErrorCallback errorCallback)
51 this.connectionReaders = new List<InitialServerConnectionReader>();
53 new ConnectionAcceptor(listener, maxAccepts, maxPendingConnections, OnConnectionAvailable, errorCallback);
54 this.channelInitializationTimeout = channelInitializationTimeout;
55 this.idleTimeout = idleTimeout;
56 this.maxPooledConnections = maxPooledConnections;
57 this.onConnectionClosed = new ConnectionClosedCallback(OnConnectionClosed);
58 this.transportSettingsCallback = transportSettingsCallback;
59 this.singletonPreambleCallback = singletonPreambleCallback;
60 this.serverSessionPreambleCallback = serverSessionPreambleCallback;
78 for (int i = 0; i < connectionReaders.Count; i++)
80 connectionReaders[i].Dispose();
82 connectionReaders.Clear();
87 ConnectionModeReader SetupModeReader(IConnection connection, bool isCached)
89 ConnectionModeReader modeReader;
92 if (onCachedConnectionModeKnown == null)
94 onCachedConnectionModeKnown = new ConnectionModeCallback(OnCachedConnectionModeKnown);
97 modeReader = new ConnectionModeReader(connection, onCachedConnectionModeKnown, onConnectionClosed);
101 if (onConnectionModeKnown == null)
103 onConnectionModeKnown = new ConnectionModeCallback(OnConnectionModeKnown);
106 modeReader = new ConnectionModeReader(connection, onConnectionModeKnown, onConnectionClosed);
113 modeReader.Dispose();
117 connectionReaders.Add(modeReader);
122 public void ReuseConnection(IConnection connection, TimeSpan closeTimeout)
124 connection.ExceptionEventType = TraceEventType.Information;
125 ConnectionModeReader modeReader = SetupModeReader(connection, true);
127 if (modeReader != null)
129 if (reuseConnectionCallback == null)
131 reuseConnectionCallback = new Action<object>(ReuseConnectionCallback);
134 ActionItem.Schedule(reuseConnectionCallback, new ReuseConnectionState(modeReader, closeTimeout));
138 void ReuseConnectionCallback(object state)
140 ReuseConnectionState connectionState = (ReuseConnectionState)state;
141 bool closeReader = false;
144 if (this.pooledConnectionCount >= this.maxPooledConnections)
150 this.pooledConnectionCount++;
156 if (DiagnosticUtility.ShouldTraceWarning)
158 TraceUtility.TraceEvent(TraceEventType.Warning,
159 TraceCode.ServerMaxPooledConnectionsQuotaReached,
160 SR.GetString(SR.TraceCodeServerMaxPooledConnectionsQuotaReached, maxPooledConnections),
161 new StringTraceRecord("MaxOutboundConnectionsPerEndpoint", maxPooledConnections.ToString(CultureInfo.InvariantCulture)),
165 if (TD.ServerMaxPooledConnectionsQuotaReachedIsEnabled())
167 TD.ServerMaxPooledConnectionsQuotaReached();
170 connectionState.ModeReader.CloseFromPool(connectionState.CloseTimeout);
174 if (this.pooledConnectionDequeuedCallback == null)
176 this.pooledConnectionDequeuedCallback = new Action(PooledConnectionDequeuedCallback);
178 connectionState.ModeReader.StartReading(this.idleTimeout, this.pooledConnectionDequeuedCallback);
182 void PooledConnectionDequeuedCallback()
186 this.pooledConnectionCount--;
187 Fx.Assert(this.pooledConnectionCount >= 0, "Connection Throttle should never be negative");
191 void OnConnectionAvailable(IConnection connection, Action connectionDequeuedCallback)
193 ConnectionModeReader modeReader = SetupModeReader(connection, false);
195 if (modeReader != null)
197 // StartReading() will never throw non-fatal exceptions;
198 // it propagates all exceptions into the onConnectionModeKnown callback,
199 // which is where we need our robust handling
200 modeReader.StartReading(this.channelInitializationTimeout, connectionDequeuedCallback);
204 connectionDequeuedCallback();
208 void OnCachedConnectionModeKnown(ConnectionModeReader modeReader)
210 OnConnectionModeKnownCore(modeReader, true);
213 void OnConnectionModeKnown(ConnectionModeReader modeReader)
215 OnConnectionModeKnownCore(modeReader, false);
218 void OnConnectionModeKnownCore(ConnectionModeReader modeReader, bool isCached)
225 this.connectionReaders.Remove(modeReader);
228 bool closeReader = true;
231 FramingMode framingMode;
234 framingMode = modeReader.GetConnectionMode();
236 catch (CommunicationException exception)
238 TraceEventType eventType = modeReader.Connection.ExceptionEventType;
239 DiagnosticUtility.TraceHandledException(exception, eventType);
242 catch (TimeoutException exception)
246 exception = new TimeoutException(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout), exception);
247 System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception);
250 if (TD.ChannelInitializationTimeoutIsEnabled())
252 TD.ChannelInitializationTimeout(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout));
255 TraceEventType eventType = modeReader.Connection.ExceptionEventType;
256 DiagnosticUtility.TraceHandledException(exception, eventType);
262 case FramingMode.Duplex:
263 OnDuplexConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback,
264 modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize,
265 modeReader.GetRemainingTimeout());
268 case FramingMode.Singleton:
269 OnSingletonConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback,
270 modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize,
271 modeReader.GetRemainingTimeout());
276 Exception inner = new InvalidDataException(SR.GetString(
277 SR.FramingModeNotSupported, framingMode));
278 Exception exception = new ProtocolException(inner.Message, inner);
279 FramingEncodingString.AddFaultString(exception, FramingEncodingString.UnsupportedModeFault);
280 System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception);
293 if (!ExceptionHandler.HandleTransportExceptionHelper(e))
298 // containment -- the reader is aborted, no need for additional containment
304 modeReader.Dispose();
309 void OnConnectionClosed(InitialServerConnectionReader connectionReader)
316 connectionReaders.Remove(connectionReader);
320 void OnSingletonConnection(IConnection connection, Action connectionDequeuedCallback,
321 long streamPosition, int offset, int size, TimeSpan timeout)
323 if (onSingletonPreambleKnown == null)
325 onSingletonPreambleKnown = OnSingletonPreambleKnown;
327 ServerSingletonPreambleConnectionReader singletonPreambleReader =
328 new ServerSingletonPreambleConnectionReader(connection, connectionDequeuedCallback, streamPosition, offset, size,
329 transportSettingsCallback, onConnectionClosed, onSingletonPreambleKnown);
335 singletonPreambleReader.Dispose();
339 connectionReaders.Add(singletonPreambleReader);
341 singletonPreambleReader.StartReading(viaDelegate, timeout);
344 void OnSingletonPreambleKnown(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader)
353 connectionReaders.Remove(serverSingletonPreambleReader);
356 if (onSingletonPreambleComplete == null)
358 onSingletonPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnSingletonPreambleComplete));
361 ISingletonChannelListener singletonChannelListener = singletonPreambleCallback(serverSingletonPreambleReader);
362 Fx.Assert(singletonChannelListener != null,
363 "singletonPreambleCallback must return a listener or send a Fault/throw");
365 // transfer ownership of the connection from the preamble reader to the message handler
367 IAsyncResult result = BeginCompleteSingletonPreamble(serverSingletonPreambleReader, singletonChannelListener, onSingletonPreambleComplete, this);
369 if (result.CompletedSynchronously)
371 EndCompleteSingletonPreamble(result);
375 IAsyncResult BeginCompleteSingletonPreamble(
376 ServerSingletonPreambleConnectionReader serverSingletonPreambleReader,
377 ISingletonChannelListener singletonChannelListener,
378 AsyncCallback callback, object state)
380 return new CompleteSingletonPreambleAndDispatchRequestAsyncResult(serverSingletonPreambleReader, singletonChannelListener, this, callback, state);
383 void EndCompleteSingletonPreamble(IAsyncResult result)
385 CompleteSingletonPreambleAndDispatchRequestAsyncResult.End(result);
388 static void OnSingletonPreambleComplete(IAsyncResult result)
390 if (result.CompletedSynchronously)
395 ConnectionDemuxer thisPtr = (ConnectionDemuxer)result.AsyncState;
399 thisPtr.EndCompleteSingletonPreamble(result);
408 //should never actually hit this code - the async result will handle all exceptions, trace them, then abort the reader
409 DiagnosticUtility.TraceHandledException(ex, TraceEventType.Warning);
413 void OnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader)
422 connectionReaders.Remove(serverSessionPreambleReader);
425 TraceOnSessionPreambleKnown(serverSessionPreambleReader);
427 serverSessionPreambleCallback(serverSessionPreambleReader, this);
430 static void TraceOnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader)
432 if (TD.SessionPreambleUnderstoodIsEnabled())
434 TD.SessionPreambleUnderstood((serverSessionPreambleReader.Via != null) ? serverSessionPreambleReader.Via.ToString() : String.Empty);
438 void OnDuplexConnection(IConnection connection, Action connectionDequeuedCallback,
439 long streamPosition, int offset, int size, TimeSpan timeout)
441 if (onSessionPreambleKnown == null)
443 onSessionPreambleKnown = OnSessionPreambleKnown;
445 ServerSessionPreambleConnectionReader sessionPreambleReader = new ServerSessionPreambleConnectionReader(
446 connection, connectionDequeuedCallback, streamPosition, offset, size,
447 transportSettingsCallback, onConnectionClosed, onSessionPreambleKnown);
452 sessionPreambleReader.Dispose();
456 connectionReaders.Add(sessionPreambleReader);
459 sessionPreambleReader.StartReading(viaDelegate, timeout);
462 public void StartDemuxing()
467 public void StartDemuxing(Action<Uri> viaDelegate)
469 this.viaDelegate = viaDelegate;
470 acceptor.StartAccepting();
473 class CompleteSingletonPreambleAndDispatchRequestAsyncResult : AsyncResult
475 ServerSingletonPreambleConnectionReader serverSingletonPreambleReader;
476 ISingletonChannelListener singletonChannelListener;
477 ConnectionDemuxer demuxer;
478 TimeoutHelper timeoutHelper;
480 static AsyncCallback onPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnPreambleComplete));
482 public CompleteSingletonPreambleAndDispatchRequestAsyncResult(
483 ServerSingletonPreambleConnectionReader serverSingletonPreambleReader,
484 ISingletonChannelListener singletonChannelListener,
485 ConnectionDemuxer demuxer,
486 AsyncCallback callback, object state)
487 : base(callback, state)
489 this.serverSingletonPreambleReader = serverSingletonPreambleReader;
490 this.singletonChannelListener = singletonChannelListener;
491 this.demuxer = demuxer;
493 //if this throws, the calling code paths will abort the connection, so we only need to
494 //call AbortConnection if BeginCompletePramble completes asynchronously.
495 if (BeginCompletePreamble())
501 public static void End(IAsyncResult result)
503 AsyncResult.End<CompleteSingletonPreambleAndDispatchRequestAsyncResult>(result);
506 bool BeginCompletePreamble()
508 this.timeoutHelper = new TimeoutHelper(this.singletonChannelListener.ReceiveTimeout);
509 IAsyncResult result = this.serverSingletonPreambleReader.BeginCompletePreamble(this.timeoutHelper.RemainingTime(), onPreambleComplete, this);
511 if (result.CompletedSynchronously)
513 return HandlePreambleComplete(result);
519 static void OnPreambleComplete(IAsyncResult result)
521 if (result.CompletedSynchronously)
526 CompleteSingletonPreambleAndDispatchRequestAsyncResult thisPtr = (CompleteSingletonPreambleAndDispatchRequestAsyncResult)result.AsyncState;
527 bool completeSelf = false;
531 completeSelf = thisPtr.HandlePreambleComplete(result);
540 //Don't complete this AsyncResult with this non-fatal exception. The calling code can't really do anything with it,
541 //so just trace it (inside of AbortConnection), then ---- it.
543 thisPtr.AbortConnection(ex);
548 thisPtr.Complete(false);
552 bool HandlePreambleComplete(IAsyncResult result)
554 IConnection upgradedConnection = this.serverSingletonPreambleReader.EndCompletePreamble(result);
555 ServerSingletonConnectionReader singletonReader = new ServerSingletonConnectionReader(serverSingletonPreambleReader, upgradedConnection, this.demuxer);
557 //singletonReader doesn't have async version of ReceiveRequest, so just call the [....] method for now.
558 RequestContext requestContext = singletonReader.ReceiveRequest(this.timeoutHelper.RemainingTime());
559 singletonChannelListener.ReceiveRequest(requestContext, serverSingletonPreambleReader.ConnectionDequeuedCallback, true);
564 void AbortConnection(Exception exception)
566 //this will trace the exception and abort the connection
567 this.serverSingletonPreambleReader.Abort(exception);
571 class ReuseConnectionState
573 ConnectionModeReader modeReader;
574 TimeSpan closeTimeout;
576 public ReuseConnectionState(ConnectionModeReader modeReader, TimeSpan closeTimeout)
578 this.modeReader = modeReader;
579 this.closeTimeout = closeTimeout;
582 public ConnectionModeReader ModeReader
584 get { return this.modeReader; }
587 public TimeSpan CloseTimeout
589 get { return this.closeTimeout; }