//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace System.ServiceModel.Channels { using System.Runtime; using System.Security.Authentication.ExtendedProtection; using System.ServiceModel; using System.ServiceModel.Security; using System.Threading; class StreamedFramingRequestChannel : RequestChannel { IConnectionInitiator connectionInitiator; ConnectionPool connectionPool; MessageEncoder messageEncoder; IConnectionOrientedTransportFactorySettings settings; byte[] startBytes; StreamUpgradeProvider upgrade; ChannelBinding channelBindingToken; public StreamedFramingRequestChannel(ChannelManagerBase factory, IConnectionOrientedTransportChannelFactorySettings settings, EndpointAddress remoteAddresss, Uri via, IConnectionInitiator connectionInitiator, ConnectionPool connectionPool) : base(factory, remoteAddresss, via, settings.ManualAddressing) { this.settings = settings; this.connectionInitiator = connectionInitiator; this.connectionPool = connectionPool; this.messageEncoder = settings.MessageEncoderFactory.Encoder; this.upgrade = settings.Upgrade; } byte[] Preamble { get { return this.startBytes; } } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return new CompletedAsyncResult(callback, state); } protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } protected override void OnOpen(TimeSpan timeout) { } protected override void OnOpened() { // setup our preamble which we'll use for all connections we establish EncodedVia encodedVia = new EncodedVia(this.Via.AbsoluteUri); EncodedContentType encodedContentType = EncodedContentType.Create(settings.MessageEncoderFactory.Encoder.ContentType); int startSize = ClientSingletonEncoder.ModeBytes.Length + ClientSingletonEncoder.CalcStartSize(encodedVia, encodedContentType); int preambleEndOffset = 0; if (this.upgrade == null) { preambleEndOffset = startSize; startSize += ClientDuplexEncoder.PreambleEndBytes.Length; } this.startBytes = DiagnosticUtility.Utility.AllocateByteArray(startSize); Buffer.BlockCopy(ClientSingletonEncoder.ModeBytes, 0, startBytes, 0, ClientSingletonEncoder.ModeBytes.Length); ClientSingletonEncoder.EncodeStart(this.startBytes, ClientSingletonEncoder.ModeBytes.Length, encodedVia, encodedContentType); if (preambleEndOffset > 0) { Buffer.BlockCopy(ClientSingletonEncoder.PreambleEndBytes, 0, startBytes, preambleEndOffset, ClientSingletonEncoder.PreambleEndBytes.Length); } // and then transition to the Opened state base.OnOpened(); } protected override IAsyncRequest CreateAsyncRequest(Message message, AsyncCallback callback, object state) { return new StreamedFramingAsyncRequest(this, callback, state); } protected override IRequest CreateRequest(Message message) { return new StreamedFramingRequest(this); } IConnection SendPreamble(IConnection connection, ref TimeoutHelper timeoutHelper, ClientFramingDecoder decoder, out SecurityMessageProperty remoteSecurity) { connection.Write(Preamble, 0, Preamble.Length, true, timeoutHelper.RemainingTime()); if (upgrade != null) { IStreamUpgradeChannelBindingProvider channelBindingProvider = upgrade.GetProperty(); StreamUpgradeInitiator upgradeInitiator = upgrade.CreateUpgradeInitiator(this.RemoteAddress, this.Via); if (!ConnectionUpgradeHelper.InitiateUpgrade(upgradeInitiator, ref connection, decoder, this, ref timeoutHelper)) { ConnectionUpgradeHelper.DecodeFramingFault(decoder, connection, Via, messageEncoder.ContentType, ref timeoutHelper); } if (channelBindingProvider != null && channelBindingProvider.IsChannelBindingSupportEnabled) { this.channelBindingToken = channelBindingProvider.GetChannelBinding(upgradeInitiator, ChannelBindingKind.Endpoint); } remoteSecurity = StreamSecurityUpgradeInitiator.GetRemoteSecurity(upgradeInitiator); connection.Write(ClientSingletonEncoder.PreambleEndBytes, 0, ClientSingletonEncoder.PreambleEndBytes.Length, true, timeoutHelper.RemainingTime()); } else { remoteSecurity = null; } // read ACK byte[] ackBuffer = new byte[1]; int ackBytesRead = connection.Read(ackBuffer, 0, ackBuffer.Length, timeoutHelper.RemainingTime()); if (!ConnectionUpgradeHelper.ValidatePreambleResponse(ackBuffer, ackBytesRead, decoder, this.Via)) { ConnectionUpgradeHelper.DecodeFramingFault(decoder, connection, Via, messageEncoder.ContentType, ref timeoutHelper); } return connection; } protected override void OnClose(TimeSpan timeout) { base.WaitForPendingRequests(timeout); } protected override void OnClosed() { base.OnClosed(); // clean up the CBT after transitioning to the closed state ChannelBindingUtility.Dispose(ref this.channelBindingToken); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return base.BeginWaitForPendingRequests(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { base.EndWaitForPendingRequests(result); } internal class StreamedConnectionPoolHelper : ConnectionPoolHelper { StreamedFramingRequestChannel channel; ClientSingletonDecoder decoder; SecurityMessageProperty remoteSecurity; public StreamedConnectionPoolHelper(StreamedFramingRequestChannel channel) : base(channel.connectionPool, channel.connectionInitiator, channel.Via) { this.channel = channel; } public ClientSingletonDecoder Decoder { get { return this.decoder; } } public SecurityMessageProperty RemoteSecurity { get { return this.remoteSecurity; } } protected override TimeoutException CreateNewConnectionTimeoutException(TimeSpan timeout, TimeoutException innerException) { return new TimeoutException(SR.GetString(SR.RequestTimedOutEstablishingTransportSession, timeout, channel.Via.AbsoluteUri), innerException); } protected override IConnection AcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper) { this.decoder = new ClientSingletonDecoder(0); return channel.SendPreamble(connection, ref timeoutHelper, this.decoder, out this.remoteSecurity); } protected override IAsyncResult BeginAcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state) { this.decoder = new ClientSingletonDecoder(0); return new SendPreambleAsyncResult(channel, connection, ref timeoutHelper, decoder, callback, state); } protected override IConnection EndAcceptPooledConnection(IAsyncResult result) { return SendPreambleAsyncResult.End(result, out this.remoteSecurity); } class SendPreambleAsyncResult : AsyncResult { StreamedFramingRequestChannel channel; IConnection connection; ClientFramingDecoder decoder; StreamUpgradeInitiator upgradeInitiator; SecurityMessageProperty remoteSecurity; TimeoutHelper timeoutHelper; static WaitCallback onWritePreamble = Fx.ThunkCallback(new WaitCallback(OnWritePreamble)); static WaitCallback onWritePreambleEnd; static WaitCallback onReadPreambleAck = new WaitCallback(OnReadPreambleAck); static AsyncCallback onUpgrade; static AsyncCallback onFailedUpgrade; IStreamUpgradeChannelBindingProvider channelBindingProvider; public SendPreambleAsyncResult(StreamedFramingRequestChannel channel, IConnection connection, ref TimeoutHelper timeoutHelper, ClientFramingDecoder decoder, AsyncCallback callback, object state) : base(callback, state) { this.channel = channel; this.connection = connection; this.timeoutHelper = timeoutHelper; this.decoder = decoder; AsyncCompletionResult writePreambleResult = connection.BeginWrite(channel.Preamble, 0, channel.Preamble.Length, true, timeoutHelper.RemainingTime(), onWritePreamble, this); if (writePreambleResult == AsyncCompletionResult.Queued) { return; } if (HandleWritePreamble()) { base.Complete(true); } } public static IConnection End(IAsyncResult result, out SecurityMessageProperty remoteSecurity) { SendPreambleAsyncResult thisPtr = AsyncResult.End(result); remoteSecurity = thisPtr.remoteSecurity; return thisPtr.connection; } bool HandleWritePreamble() { connection.EndWrite(); if (channel.upgrade == null) { return ReadPreambleAck(); } else { this.channelBindingProvider = channel.upgrade.GetProperty(); this.upgradeInitiator = channel.upgrade.CreateUpgradeInitiator(channel.RemoteAddress, channel.Via); if (onUpgrade == null) { onUpgrade = Fx.ThunkCallback(new AsyncCallback(OnUpgrade)); } IAsyncResult initiateUpgradeResult = ConnectionUpgradeHelper.BeginInitiateUpgrade(channel.settings, channel.RemoteAddress, connection, decoder, this.upgradeInitiator, channel.messageEncoder.ContentType, null, this.timeoutHelper, onUpgrade, this); if (!initiateUpgradeResult.CompletedSynchronously) { return false; } return HandleUpgrade(initiateUpgradeResult); } } bool HandleUpgrade(IAsyncResult result) { connection = ConnectionUpgradeHelper.EndInitiateUpgrade(result); if (this.channelBindingProvider != null && this.channelBindingProvider.IsChannelBindingSupportEnabled) { this.channel.channelBindingToken = this.channelBindingProvider.GetChannelBinding(this.upgradeInitiator, ChannelBindingKind.Endpoint); } this.remoteSecurity = StreamSecurityUpgradeInitiator.GetRemoteSecurity(this.upgradeInitiator); this.upgradeInitiator = null; // we're done with the initiator if (onWritePreambleEnd == null) { onWritePreambleEnd = Fx.ThunkCallback(new WaitCallback(OnWritePreambleEnd)); } AsyncCompletionResult writePreambleResult = connection.BeginWrite( ClientSingletonEncoder.PreambleEndBytes, 0, ClientSingletonEncoder.PreambleEndBytes.Length, true, timeoutHelper.RemainingTime(), onWritePreambleEnd, this); if (writePreambleResult == AsyncCompletionResult.Queued) { return false; } connection.EndWrite(); return ReadPreambleAck(); } bool ReadPreambleAck() { AsyncCompletionResult readAckResult = connection.BeginRead(0, 1, timeoutHelper.RemainingTime(), onReadPreambleAck, this); if (readAckResult == AsyncCompletionResult.Queued) { return false; } return HandlePreambleAck(); } bool HandlePreambleAck() { int ackBytesRead = connection.EndRead(); if (!ConnectionUpgradeHelper.ValidatePreambleResponse( connection.AsyncReadBuffer, ackBytesRead, decoder, channel.Via)) { if (onFailedUpgrade == null) { onFailedUpgrade = Fx.ThunkCallback(new AsyncCallback(OnFailedUpgrade)); } IAsyncResult decodeFaultResult = ConnectionUpgradeHelper.BeginDecodeFramingFault(decoder, connection, channel.Via, channel.messageEncoder.ContentType, ref timeoutHelper, onFailedUpgrade, this); if (!decodeFaultResult.CompletedSynchronously) { return false; } ConnectionUpgradeHelper.EndDecodeFramingFault(decodeFaultResult); return true; } return true; } static void OnWritePreamble(object asyncState) { SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)asyncState; Exception completionException = null; bool completeSelf; try { completeSelf = thisPtr.HandleWritePreamble(); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnWritePreambleEnd(object asyncState) { SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)asyncState; Exception completionException = null; bool completeSelf; try { thisPtr.connection.EndWrite(); completeSelf = thisPtr.ReadPreambleAck(); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnReadPreambleAck(object state) { SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)state; Exception completionException = null; bool completeSelf; try { completeSelf = thisPtr.HandlePreambleAck(); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnUpgrade(IAsyncResult result) { if (result.CompletedSynchronously) { return; } SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState; Exception completionException = null; bool completeSelf; try { completeSelf = thisPtr.HandleUpgrade(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnFailedUpgrade(IAsyncResult result) { if (result.CompletedSynchronously) { return; } SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState; Exception completionException = null; try { ConnectionUpgradeHelper.EndDecodeFramingFault(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completionException = e; } thisPtr.Complete(false, completionException); } } } class ClientSingletonConnectionReader : SingletonConnectionReader { StreamedConnectionPoolHelper connectionPoolHelper; public ClientSingletonConnectionReader(IConnection connection, StreamedConnectionPoolHelper connectionPoolHelper, IConnectionOrientedTransportFactorySettings settings) : base(connection, 0, 0, connectionPoolHelper.RemoteSecurity, settings, null) { this.connectionPoolHelper = connectionPoolHelper; } protected override long StreamPosition { get { return connectionPoolHelper.Decoder.StreamPosition; } } protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof) { while (size > 0) { int bytesRead = connectionPoolHelper.Decoder.Decode(buffer, offset, size); if (bytesRead > 0) { offset += bytesRead; size -= bytesRead; } switch (connectionPoolHelper.Decoder.CurrentState) { case ClientFramingDecoderState.EnvelopeStart: // we're at the envelope return true; case ClientFramingDecoderState.End: isAtEof = true; return false; } } return false; } protected override void OnClose(TimeSpan timeout) { connectionPoolHelper.Close(timeout); } } class StreamedFramingRequest : IRequest { StreamedFramingRequestChannel channel; StreamedConnectionPoolHelper connectionPoolHelper; IConnection connection; public StreamedFramingRequest(StreamedFramingRequestChannel channel) { this.channel = channel; this.connectionPoolHelper = new StreamedConnectionPoolHelper(channel); } public void SendRequest(Message message, TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); try { this.connection = connectionPoolHelper.EstablishConnection(timeoutHelper.RemainingTime()); ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, message, false); bool success = false; try { StreamingConnectionHelper.WriteMessage(message, this.connection, true, channel.settings, ref timeoutHelper); success = true; } finally { if (!success) { connectionPoolHelper.Abort(); } } } catch (TimeoutException exception) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new TimeoutException(SR.GetString(SR.TimeoutOnRequest, timeout), exception)); } } public Message WaitForReply(TimeSpan timeout) { ClientSingletonConnectionReader connectionReader = new ClientSingletonConnectionReader( connection, connectionPoolHelper, channel.settings); connectionReader.DoneSending(TimeSpan.Zero); // we still need to receive Message message = connectionReader.Receive(timeout); if (message != null) { ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, message, false); } return message; } void Cleanup() { this.connectionPoolHelper.Abort(); } public void Abort(RequestChannel requestChannel) { Cleanup(); } public void Fault(RequestChannel requestChannel) { Cleanup(); } public void OnReleaseRequest() { } } class StreamedFramingAsyncRequest : AsyncResult, IAsyncRequest { StreamedFramingRequestChannel channel; IConnection connection; StreamedConnectionPoolHelper connectionPoolHelper; Message message; Message replyMessage; TimeoutHelper timeoutHelper; static AsyncCallback onEstablishConnection = Fx.ThunkCallback(new AsyncCallback(OnEstablishConnection)); static AsyncCallback onWriteMessage = Fx.ThunkCallback(new AsyncCallback(OnWriteMessage)); static AsyncCallback onReceiveReply = Fx.ThunkCallback(new AsyncCallback(OnReceiveReply)); ClientSingletonConnectionReader connectionReader; public StreamedFramingAsyncRequest(StreamedFramingRequestChannel channel, AsyncCallback callback, object state) : base(callback, state) { this.channel = channel; this.connectionPoolHelper = new StreamedConnectionPoolHelper(channel); } public void BeginSendRequest(Message message, TimeSpan timeout) { this.timeoutHelper = new TimeoutHelper(timeout); this.message = message; bool completeSelf = false; bool success = false; try { try { IAsyncResult result = connectionPoolHelper.BeginEstablishConnection(timeoutHelper.RemainingTime(), onEstablishConnection, this); if (result.CompletedSynchronously) { completeSelf = HandleEstablishConnection(result); } } catch (TimeoutException exception) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new TimeoutException(SR.GetString(SR.TimeoutOnRequest, timeout), exception)); } success = true; } finally { if (!success) { Cleanup(); } } if (completeSelf) { base.Complete(true); } } bool HandleEstablishConnection(IAsyncResult result) { this.connection = connectionPoolHelper.EndEstablishConnection(result); ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, this.message, false); IAsyncResult writeResult = StreamingConnectionHelper.BeginWriteMessage(this.message, this.connection, true, this.channel.settings, ref timeoutHelper, onWriteMessage, this); if (!writeResult.CompletedSynchronously) { return false; } return HandleWriteMessage(writeResult); } public Message End() { try { AsyncResult.End(this); } catch (TimeoutException exception) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new TimeoutException(SR.GetString(SR.TimeoutOnRequest, this.timeoutHelper.OriginalTimeout), exception)); } return replyMessage; } public void Abort(RequestChannel requestChannel) { Cleanup(); } public void Fault(RequestChannel requestChannel) { Cleanup(); } void Cleanup() { connectionPoolHelper.Abort(); } bool HandleWriteMessage(IAsyncResult result) { // write out the streamed message StreamingConnectionHelper.EndWriteMessage(result); connectionReader = new ClientSingletonConnectionReader(connection, connectionPoolHelper, channel.settings); connectionReader.DoneSending(TimeSpan.Zero); // we still need to receive IAsyncResult receiveResult = connectionReader.BeginReceive(timeoutHelper.RemainingTime(), onReceiveReply, this); if (!receiveResult.CompletedSynchronously) { return false; } return CompleteReceiveReply(receiveResult); } bool CompleteReceiveReply(IAsyncResult result) { this.replyMessage = connectionReader.EndReceive(result); if (this.replyMessage != null) { ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, this.replyMessage, false); } return true; } static void OnEstablishConnection(IAsyncResult result) { if (result.CompletedSynchronously) { return; } StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState; Exception completionException = null; bool completeSelf; bool throwing = true; try { completeSelf = thisPtr.HandleEstablishConnection(result); throwing = false; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (throwing) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnWriteMessage(IAsyncResult result) { if (result.CompletedSynchronously) { return; } StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState; Exception completionException = null; bool completeSelf; bool throwing = true; try { completeSelf = thisPtr.HandleWriteMessage(result); throwing = false; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (throwing) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnReceiveReply(IAsyncResult result) { StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState; Exception completionException = null; bool completeSelf; bool throwing = true; try { completeSelf = thisPtr.CompleteReceiveReply(result); throwing = false; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (throwing) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } public void OnReleaseRequest() { } } } }