//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.Runtime; using System.Runtime.Diagnostics; using System.ServiceModel; using System.ServiceModel.Activation; using System.ServiceModel.Diagnostics; using System.ServiceModel.Diagnostics.Application; using System.ServiceModel.Dispatcher; using System.Threading; sealed class ConnectionDemuxer : IDisposable { static AsyncCallback onSingletonPreambleComplete; ConnectionAcceptor acceptor; // we use this list to track readers that don't have a clear owner (so they don't get GC'ed) List connectionReaders; bool isDisposed; ConnectionModeCallback onConnectionModeKnown; ConnectionModeCallback onCachedConnectionModeKnown; ConnectionClosedCallback onConnectionClosed; ServerSessionPreambleCallback onSessionPreambleKnown; ServerSingletonPreambleCallback onSingletonPreambleKnown; Action reuseConnectionCallback; ServerSessionPreambleDemuxCallback serverSessionPreambleCallback; SingletonPreambleDemuxCallback singletonPreambleCallback; TransportSettingsCallback transportSettingsCallback; Action pooledConnectionDequeuedCallback; Action viaDelegate; TimeSpan channelInitializationTimeout; TimeSpan idleTimeout; int maxPooledConnections; int pooledConnectionCount; public ConnectionDemuxer(IConnectionListener listener, int maxAccepts, int maxPendingConnections, TimeSpan channelInitializationTimeout, TimeSpan idleTimeout, int maxPooledConnections, TransportSettingsCallback transportSettingsCallback, SingletonPreambleDemuxCallback singletonPreambleCallback, ServerSessionPreambleDemuxCallback serverSessionPreambleCallback, ErrorCallback errorCallback) { this.connectionReaders = new List(); this.acceptor = new ConnectionAcceptor(listener, maxAccepts, maxPendingConnections, OnConnectionAvailable, errorCallback); this.channelInitializationTimeout = channelInitializationTimeout; this.idleTimeout = idleTimeout; this.maxPooledConnections = maxPooledConnections; this.onConnectionClosed = new ConnectionClosedCallback(OnConnectionClosed); this.transportSettingsCallback = transportSettingsCallback; this.singletonPreambleCallback = singletonPreambleCallback; this.serverSessionPreambleCallback = serverSessionPreambleCallback; } object ThisLock { get { return this; } } public void Dispose() { lock (ThisLock) { if (isDisposed) return; isDisposed = true; } for (int i = 0; i < connectionReaders.Count; i++) { connectionReaders[i].Dispose(); } connectionReaders.Clear(); acceptor.Dispose(); } ConnectionModeReader SetupModeReader(IConnection connection, bool isCached) { ConnectionModeReader modeReader; if (isCached) { if (onCachedConnectionModeKnown == null) { onCachedConnectionModeKnown = new ConnectionModeCallback(OnCachedConnectionModeKnown); } modeReader = new ConnectionModeReader(connection, onCachedConnectionModeKnown, onConnectionClosed); } else { if (onConnectionModeKnown == null) { onConnectionModeKnown = new ConnectionModeCallback(OnConnectionModeKnown); } modeReader = new ConnectionModeReader(connection, onConnectionModeKnown, onConnectionClosed); } lock (ThisLock) { if (isDisposed) { modeReader.Dispose(); return null; } connectionReaders.Add(modeReader); return modeReader; } } public void ReuseConnection(IConnection connection, TimeSpan closeTimeout) { connection.ExceptionEventType = TraceEventType.Information; ConnectionModeReader modeReader = SetupModeReader(connection, true); if (modeReader != null) { if (reuseConnectionCallback == null) { reuseConnectionCallback = new Action(ReuseConnectionCallback); } ActionItem.Schedule(reuseConnectionCallback, new ReuseConnectionState(modeReader, closeTimeout)); } } void ReuseConnectionCallback(object state) { ReuseConnectionState connectionState = (ReuseConnectionState)state; bool closeReader = false; lock (ThisLock) { if (this.pooledConnectionCount >= this.maxPooledConnections) { closeReader = true; } else { this.pooledConnectionCount++; } } if (closeReader) { if (DiagnosticUtility.ShouldTraceWarning) { TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.ServerMaxPooledConnectionsQuotaReached, SR.GetString(SR.TraceCodeServerMaxPooledConnectionsQuotaReached, maxPooledConnections), new StringTraceRecord("MaxOutboundConnectionsPerEndpoint", maxPooledConnections.ToString(CultureInfo.InvariantCulture)), this, null); } if (TD.ServerMaxPooledConnectionsQuotaReachedIsEnabled()) { TD.ServerMaxPooledConnectionsQuotaReached(); } connectionState.ModeReader.CloseFromPool(connectionState.CloseTimeout); } else { if (this.pooledConnectionDequeuedCallback == null) { this.pooledConnectionDequeuedCallback = new Action(PooledConnectionDequeuedCallback); } connectionState.ModeReader.StartReading(this.idleTimeout, this.pooledConnectionDequeuedCallback); } } void PooledConnectionDequeuedCallback() { lock (ThisLock) { this.pooledConnectionCount--; Fx.Assert(this.pooledConnectionCount >= 0, "Connection Throttle should never be negative"); } } void OnConnectionAvailable(IConnection connection, Action connectionDequeuedCallback) { ConnectionModeReader modeReader = SetupModeReader(connection, false); if (modeReader != null) { // StartReading() will never throw non-fatal exceptions; // it propagates all exceptions into the onConnectionModeKnown callback, // which is where we need our robust handling modeReader.StartReading(this.channelInitializationTimeout, connectionDequeuedCallback); } else { connectionDequeuedCallback(); } } void OnCachedConnectionModeKnown(ConnectionModeReader modeReader) { OnConnectionModeKnownCore(modeReader, true); } void OnConnectionModeKnown(ConnectionModeReader modeReader) { OnConnectionModeKnownCore(modeReader, false); } void OnConnectionModeKnownCore(ConnectionModeReader modeReader, bool isCached) { lock (ThisLock) { if (isDisposed) return; this.connectionReaders.Remove(modeReader); } bool closeReader = true; try { FramingMode framingMode; try { framingMode = modeReader.GetConnectionMode(); } catch (CommunicationException exception) { TraceEventType eventType = modeReader.Connection.ExceptionEventType; DiagnosticUtility.TraceHandledException(exception, eventType); return; } catch (TimeoutException exception) { if (!isCached) { exception = new TimeoutException(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout), exception); System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception); } if (TD.ChannelInitializationTimeoutIsEnabled()) { TD.ChannelInitializationTimeout(SR.GetString(SR.ChannelInitializationTimeout, this.channelInitializationTimeout)); } TraceEventType eventType = modeReader.Connection.ExceptionEventType; DiagnosticUtility.TraceHandledException(exception, eventType); return; } switch (framingMode) { case FramingMode.Duplex: OnDuplexConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback, modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize, modeReader.GetRemainingTimeout()); break; case FramingMode.Singleton: OnSingletonConnection(modeReader.Connection, modeReader.ConnectionDequeuedCallback, modeReader.StreamPosition, modeReader.BufferOffset, modeReader.BufferSize, modeReader.GetRemainingTimeout()); break; default: { Exception inner = new InvalidDataException(SR.GetString( SR.FramingModeNotSupported, framingMode)); Exception exception = new ProtocolException(inner.Message, inner); FramingEncodingString.AddFaultString(exception, FramingEncodingString.UnsupportedModeFault); System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(exception); return; } } closeReader = false; } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } if (!ExceptionHandler.HandleTransportExceptionHelper(e)) { throw; } // containment -- the reader is aborted, no need for additional containment } finally { if (closeReader) { modeReader.Dispose(); } } } void OnConnectionClosed(InitialServerConnectionReader connectionReader) { lock (ThisLock) { if (isDisposed) return; connectionReaders.Remove(connectionReader); } } void OnSingletonConnection(IConnection connection, Action connectionDequeuedCallback, long streamPosition, int offset, int size, TimeSpan timeout) { if (onSingletonPreambleKnown == null) { onSingletonPreambleKnown = OnSingletonPreambleKnown; } ServerSingletonPreambleConnectionReader singletonPreambleReader = new ServerSingletonPreambleConnectionReader(connection, connectionDequeuedCallback, streamPosition, offset, size, transportSettingsCallback, onConnectionClosed, onSingletonPreambleKnown); lock (ThisLock) { if (isDisposed) { singletonPreambleReader.Dispose(); return; } connectionReaders.Add(singletonPreambleReader); } singletonPreambleReader.StartReading(viaDelegate, timeout); } void OnSingletonPreambleKnown(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader) { lock (ThisLock) { if (isDisposed) { return; } connectionReaders.Remove(serverSingletonPreambleReader); } if (onSingletonPreambleComplete == null) { onSingletonPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnSingletonPreambleComplete)); } ISingletonChannelListener singletonChannelListener = singletonPreambleCallback(serverSingletonPreambleReader); Fx.Assert(singletonChannelListener != null, "singletonPreambleCallback must return a listener or send a Fault/throw"); // transfer ownership of the connection from the preamble reader to the message handler IAsyncResult result = BeginCompleteSingletonPreamble(serverSingletonPreambleReader, singletonChannelListener, onSingletonPreambleComplete, this); if (result.CompletedSynchronously) { EndCompleteSingletonPreamble(result); } } IAsyncResult BeginCompleteSingletonPreamble( ServerSingletonPreambleConnectionReader serverSingletonPreambleReader, ISingletonChannelListener singletonChannelListener, AsyncCallback callback, object state) { return new CompleteSingletonPreambleAndDispatchRequestAsyncResult(serverSingletonPreambleReader, singletonChannelListener, this, callback, state); } void EndCompleteSingletonPreamble(IAsyncResult result) { CompleteSingletonPreambleAndDispatchRequestAsyncResult.End(result); } static void OnSingletonPreambleComplete(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ConnectionDemuxer thisPtr = (ConnectionDemuxer)result.AsyncState; try { thisPtr.EndCompleteSingletonPreamble(result); } catch (Exception ex) { if (Fx.IsFatal(ex)) { throw; } //should never actually hit this code - the async result will handle all exceptions, trace them, then abort the reader DiagnosticUtility.TraceHandledException(ex, TraceEventType.Warning); } } void OnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader) { lock (ThisLock) { if (isDisposed) { return; } connectionReaders.Remove(serverSessionPreambleReader); } TraceOnSessionPreambleKnown(serverSessionPreambleReader); serverSessionPreambleCallback(serverSessionPreambleReader, this); } static void TraceOnSessionPreambleKnown(ServerSessionPreambleConnectionReader serverSessionPreambleReader) { if (TD.SessionPreambleUnderstoodIsEnabled()) { TD.SessionPreambleUnderstood((serverSessionPreambleReader.Via != null) ? serverSessionPreambleReader.Via.ToString() : String.Empty); } } void OnDuplexConnection(IConnection connection, Action connectionDequeuedCallback, long streamPosition, int offset, int size, TimeSpan timeout) { if (onSessionPreambleKnown == null) { onSessionPreambleKnown = OnSessionPreambleKnown; } ServerSessionPreambleConnectionReader sessionPreambleReader = new ServerSessionPreambleConnectionReader( connection, connectionDequeuedCallback, streamPosition, offset, size, transportSettingsCallback, onConnectionClosed, onSessionPreambleKnown); lock (ThisLock) { if (isDisposed) { sessionPreambleReader.Dispose(); return; } connectionReaders.Add(sessionPreambleReader); } sessionPreambleReader.StartReading(viaDelegate, timeout); } public void StartDemuxing() { StartDemuxing(null); } public void StartDemuxing(Action viaDelegate) { this.viaDelegate = viaDelegate; acceptor.StartAccepting(); } class CompleteSingletonPreambleAndDispatchRequestAsyncResult : AsyncResult { ServerSingletonPreambleConnectionReader serverSingletonPreambleReader; ISingletonChannelListener singletonChannelListener; ConnectionDemuxer demuxer; TimeoutHelper timeoutHelper; static AsyncCallback onPreambleComplete = Fx.ThunkCallback(new AsyncCallback(OnPreambleComplete)); public CompleteSingletonPreambleAndDispatchRequestAsyncResult( ServerSingletonPreambleConnectionReader serverSingletonPreambleReader, ISingletonChannelListener singletonChannelListener, ConnectionDemuxer demuxer, AsyncCallback callback, object state) : base(callback, state) { this.serverSingletonPreambleReader = serverSingletonPreambleReader; this.singletonChannelListener = singletonChannelListener; this.demuxer = demuxer; //if this throws, the calling code paths will abort the connection, so we only need to //call AbortConnection if BeginCompletePramble completes asynchronously. if (BeginCompletePreamble()) { Complete(true); } } public static void End(IAsyncResult result) { AsyncResult.End(result); } bool BeginCompletePreamble() { this.timeoutHelper = new TimeoutHelper(this.singletonChannelListener.ReceiveTimeout); IAsyncResult result = this.serverSingletonPreambleReader.BeginCompletePreamble(this.timeoutHelper.RemainingTime(), onPreambleComplete, this); if (result.CompletedSynchronously) { return HandlePreambleComplete(result); } return false; } static void OnPreambleComplete(IAsyncResult result) { if (result.CompletedSynchronously) { return; } CompleteSingletonPreambleAndDispatchRequestAsyncResult thisPtr = (CompleteSingletonPreambleAndDispatchRequestAsyncResult)result.AsyncState; bool completeSelf = false; try { completeSelf = thisPtr.HandlePreambleComplete(result); } catch (Exception ex) { if (Fx.IsFatal(ex)) { throw; } //Don't complete this AsyncResult with this non-fatal exception. The calling code can't really do anything with it, //so just trace it (inside of AbortConnection), then ---- it. completeSelf = true; thisPtr.AbortConnection(ex); } if (completeSelf) { thisPtr.Complete(false); } } bool HandlePreambleComplete(IAsyncResult result) { IConnection upgradedConnection = this.serverSingletonPreambleReader.EndCompletePreamble(result); ServerSingletonConnectionReader singletonReader = new ServerSingletonConnectionReader(serverSingletonPreambleReader, upgradedConnection, this.demuxer); //singletonReader doesn't have async version of ReceiveRequest, so just call the sync method for now. RequestContext requestContext = singletonReader.ReceiveRequest(this.timeoutHelper.RemainingTime()); singletonChannelListener.ReceiveRequest(requestContext, serverSingletonPreambleReader.ConnectionDequeuedCallback, true); return true; } void AbortConnection(Exception exception) { //this will trace the exception and abort the connection this.serverSingletonPreambleReader.Abort(exception); } } class ReuseConnectionState { ConnectionModeReader modeReader; TimeSpan closeTimeout; public ReuseConnectionState(ConnectionModeReader modeReader, TimeSpan closeTimeout) { this.modeReader = modeReader; this.closeTimeout = closeTimeout; } public ConnectionModeReader ModeReader { get { return this.modeReader; } } public TimeSpan CloseTimeout { get { return this.closeTimeout; } } } } }