1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System.ServiceModel.Channels
8 using System.Security.Authentication.ExtendedProtection;
9 using System.ServiceModel;
10 using System.ServiceModel.Security;
11 using System.Threading;
13 class StreamedFramingRequestChannel : RequestChannel
15 IConnectionInitiator connectionInitiator;
16 ConnectionPool connectionPool;
17 MessageEncoder messageEncoder;
18 IConnectionOrientedTransportFactorySettings settings;
20 StreamUpgradeProvider upgrade;
21 ChannelBinding channelBindingToken;
23 public StreamedFramingRequestChannel(ChannelManagerBase factory, IConnectionOrientedTransportChannelFactorySettings settings,
24 EndpointAddress remoteAddresss, Uri via, IConnectionInitiator connectionInitiator, ConnectionPool connectionPool)
25 : base(factory, remoteAddresss, via, settings.ManualAddressing)
27 this.settings = settings;
28 this.connectionInitiator = connectionInitiator;
29 this.connectionPool = connectionPool;
31 this.messageEncoder = settings.MessageEncoderFactory.Encoder;
32 this.upgrade = settings.Upgrade;
37 get { return this.startBytes; }
40 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
42 return new CompletedAsyncResult(callback, state);
45 protected override void OnEndOpen(IAsyncResult result)
47 CompletedAsyncResult.End(result);
50 protected override void OnOpen(TimeSpan timeout)
54 protected override void OnOpened()
56 // setup our preamble which we'll use for all connections we establish
57 EncodedVia encodedVia = new EncodedVia(this.Via.AbsoluteUri);
58 EncodedContentType encodedContentType = EncodedContentType.Create(settings.MessageEncoderFactory.Encoder.ContentType);
59 int startSize = ClientSingletonEncoder.ModeBytes.Length + ClientSingletonEncoder.CalcStartSize(encodedVia, encodedContentType);
60 int preambleEndOffset = 0;
61 if (this.upgrade == null)
63 preambleEndOffset = startSize;
64 startSize += ClientDuplexEncoder.PreambleEndBytes.Length;
66 this.startBytes = DiagnosticUtility.Utility.AllocateByteArray(startSize);
67 Buffer.BlockCopy(ClientSingletonEncoder.ModeBytes, 0, startBytes, 0, ClientSingletonEncoder.ModeBytes.Length);
68 ClientSingletonEncoder.EncodeStart(this.startBytes, ClientSingletonEncoder.ModeBytes.Length, encodedVia, encodedContentType);
69 if (preambleEndOffset > 0)
71 Buffer.BlockCopy(ClientSingletonEncoder.PreambleEndBytes, 0, startBytes, preambleEndOffset, ClientSingletonEncoder.PreambleEndBytes.Length);
74 // and then transition to the Opened state
78 protected override IAsyncRequest CreateAsyncRequest(Message message, AsyncCallback callback, object state)
80 return new StreamedFramingAsyncRequest(this, callback, state);
83 protected override IRequest CreateRequest(Message message)
85 return new StreamedFramingRequest(this);
88 IConnection SendPreamble(IConnection connection, ref TimeoutHelper timeoutHelper,
89 ClientFramingDecoder decoder, out SecurityMessageProperty remoteSecurity)
91 connection.Write(Preamble, 0, Preamble.Length, true, timeoutHelper.RemainingTime());
95 IStreamUpgradeChannelBindingProvider channelBindingProvider = upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
97 StreamUpgradeInitiator upgradeInitiator = upgrade.CreateUpgradeInitiator(this.RemoteAddress, this.Via);
99 if (!ConnectionUpgradeHelper.InitiateUpgrade(upgradeInitiator, ref connection, decoder,
100 this, ref timeoutHelper))
102 ConnectionUpgradeHelper.DecodeFramingFault(decoder, connection, Via, messageEncoder.ContentType, ref timeoutHelper);
105 if (channelBindingProvider != null && channelBindingProvider.IsChannelBindingSupportEnabled)
107 this.channelBindingToken = channelBindingProvider.GetChannelBinding(upgradeInitiator, ChannelBindingKind.Endpoint);
110 remoteSecurity = StreamSecurityUpgradeInitiator.GetRemoteSecurity(upgradeInitiator);
112 connection.Write(ClientSingletonEncoder.PreambleEndBytes, 0,
113 ClientSingletonEncoder.PreambleEndBytes.Length, true, timeoutHelper.RemainingTime());
117 remoteSecurity = null;
121 byte[] ackBuffer = new byte[1];
122 int ackBytesRead = connection.Read(ackBuffer, 0, ackBuffer.Length, timeoutHelper.RemainingTime());
123 if (!ConnectionUpgradeHelper.ValidatePreambleResponse(ackBuffer, ackBytesRead, decoder, this.Via))
125 ConnectionUpgradeHelper.DecodeFramingFault(decoder, connection, Via, messageEncoder.ContentType, ref timeoutHelper);
131 protected override void OnClose(TimeSpan timeout)
133 base.WaitForPendingRequests(timeout);
136 protected override void OnClosed()
140 // clean up the CBT after transitioning to the closed state
141 ChannelBindingUtility.Dispose(ref this.channelBindingToken);
144 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
146 return base.BeginWaitForPendingRequests(timeout, callback, state);
149 protected override void OnEndClose(IAsyncResult result)
151 base.EndWaitForPendingRequests(result);
154 internal class StreamedConnectionPoolHelper : ConnectionPoolHelper
156 StreamedFramingRequestChannel channel;
157 ClientSingletonDecoder decoder;
158 SecurityMessageProperty remoteSecurity;
160 public StreamedConnectionPoolHelper(StreamedFramingRequestChannel channel)
161 : base(channel.connectionPool, channel.connectionInitiator, channel.Via)
163 this.channel = channel;
166 public ClientSingletonDecoder Decoder
168 get { return this.decoder; }
171 public SecurityMessageProperty RemoteSecurity
173 get { return this.remoteSecurity; }
176 protected override TimeoutException CreateNewConnectionTimeoutException(TimeSpan timeout, TimeoutException innerException)
178 return new TimeoutException(SR.GetString(SR.RequestTimedOutEstablishingTransportSession,
179 timeout, channel.Via.AbsoluteUri), innerException);
182 protected override IConnection AcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper)
184 this.decoder = new ClientSingletonDecoder(0);
185 return channel.SendPreamble(connection, ref timeoutHelper, this.decoder, out this.remoteSecurity);
188 protected override IAsyncResult BeginAcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state)
190 this.decoder = new ClientSingletonDecoder(0);
191 return new SendPreambleAsyncResult(channel, connection, ref timeoutHelper, decoder, callback, state);
194 protected override IConnection EndAcceptPooledConnection(IAsyncResult result)
196 return SendPreambleAsyncResult.End(result, out this.remoteSecurity);
199 class SendPreambleAsyncResult : AsyncResult
201 StreamedFramingRequestChannel channel;
202 IConnection connection;
203 ClientFramingDecoder decoder;
204 StreamUpgradeInitiator upgradeInitiator;
205 SecurityMessageProperty remoteSecurity;
206 TimeoutHelper timeoutHelper;
207 static WaitCallback onWritePreamble = Fx.ThunkCallback(new WaitCallback(OnWritePreamble));
208 static WaitCallback onWritePreambleEnd;
209 static WaitCallback onReadPreambleAck = new WaitCallback(OnReadPreambleAck);
210 static AsyncCallback onUpgrade;
211 static AsyncCallback onFailedUpgrade;
212 IStreamUpgradeChannelBindingProvider channelBindingProvider;
214 public SendPreambleAsyncResult(StreamedFramingRequestChannel channel, IConnection connection,
215 ref TimeoutHelper timeoutHelper, ClientFramingDecoder decoder, AsyncCallback callback, object state)
216 : base(callback, state)
218 this.channel = channel;
219 this.connection = connection;
220 this.timeoutHelper = timeoutHelper;
221 this.decoder = decoder;
223 AsyncCompletionResult writePreambleResult = connection.BeginWrite(channel.Preamble, 0, channel.Preamble.Length,
224 true, timeoutHelper.RemainingTime(), onWritePreamble, this);
226 if (writePreambleResult == AsyncCompletionResult.Queued)
231 if (HandleWritePreamble())
237 public static IConnection End(IAsyncResult result, out SecurityMessageProperty remoteSecurity)
239 SendPreambleAsyncResult thisPtr = AsyncResult.End<SendPreambleAsyncResult>(result);
240 remoteSecurity = thisPtr.remoteSecurity;
241 return thisPtr.connection;
244 bool HandleWritePreamble()
246 connection.EndWrite();
248 if (channel.upgrade == null)
250 return ReadPreambleAck();
254 this.channelBindingProvider = channel.upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
255 this.upgradeInitiator = channel.upgrade.CreateUpgradeInitiator(channel.RemoteAddress, channel.Via);
256 if (onUpgrade == null)
258 onUpgrade = Fx.ThunkCallback(new AsyncCallback(OnUpgrade));
261 IAsyncResult initiateUpgradeResult = ConnectionUpgradeHelper.BeginInitiateUpgrade(channel.settings, channel.RemoteAddress,
262 connection, decoder, this.upgradeInitiator, channel.messageEncoder.ContentType, null,
263 this.timeoutHelper, onUpgrade, this);
265 if (!initiateUpgradeResult.CompletedSynchronously)
269 return HandleUpgrade(initiateUpgradeResult);
273 bool HandleUpgrade(IAsyncResult result)
275 connection = ConnectionUpgradeHelper.EndInitiateUpgrade(result);
277 if (this.channelBindingProvider != null && this.channelBindingProvider.IsChannelBindingSupportEnabled)
279 this.channel.channelBindingToken = this.channelBindingProvider.GetChannelBinding(this.upgradeInitiator, ChannelBindingKind.Endpoint);
282 this.remoteSecurity = StreamSecurityUpgradeInitiator.GetRemoteSecurity(this.upgradeInitiator);
283 this.upgradeInitiator = null; // we're done with the initiator
284 if (onWritePreambleEnd == null)
286 onWritePreambleEnd = Fx.ThunkCallback(new WaitCallback(OnWritePreambleEnd));
289 AsyncCompletionResult writePreambleResult = connection.BeginWrite(
290 ClientSingletonEncoder.PreambleEndBytes, 0, ClientSingletonEncoder.PreambleEndBytes.Length, true,
291 timeoutHelper.RemainingTime(), onWritePreambleEnd, this);
293 if (writePreambleResult == AsyncCompletionResult.Queued)
298 connection.EndWrite();
299 return ReadPreambleAck();
302 bool ReadPreambleAck()
304 AsyncCompletionResult readAckResult = connection.BeginRead(0, 1,
305 timeoutHelper.RemainingTime(), onReadPreambleAck, this);
307 if (readAckResult == AsyncCompletionResult.Queued)
312 return HandlePreambleAck();
315 bool HandlePreambleAck()
317 int ackBytesRead = connection.EndRead();
318 if (!ConnectionUpgradeHelper.ValidatePreambleResponse(
319 connection.AsyncReadBuffer, ackBytesRead, decoder, channel.Via))
321 if (onFailedUpgrade == null)
323 onFailedUpgrade = Fx.ThunkCallback(new AsyncCallback(OnFailedUpgrade));
325 IAsyncResult decodeFaultResult = ConnectionUpgradeHelper.BeginDecodeFramingFault(decoder,
326 connection, channel.Via, channel.messageEncoder.ContentType, ref timeoutHelper,
327 onFailedUpgrade, this);
329 if (!decodeFaultResult.CompletedSynchronously)
334 ConnectionUpgradeHelper.EndDecodeFramingFault(decodeFaultResult);
341 static void OnWritePreamble(object asyncState)
343 SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)asyncState;
345 Exception completionException = null;
349 completeSelf = thisPtr.HandleWritePreamble();
351 #pragma warning suppress 56500 // [....], transferring exception to another thread
360 completionException = e;
365 thisPtr.Complete(false, completionException);
369 static void OnWritePreambleEnd(object asyncState)
371 SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)asyncState;
373 Exception completionException = null;
377 thisPtr.connection.EndWrite();
378 completeSelf = thisPtr.ReadPreambleAck();
380 #pragma warning suppress 56500 // [....], transferring exception to another thread
389 completionException = e;
394 thisPtr.Complete(false, completionException);
398 static void OnReadPreambleAck(object state)
400 SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)state;
402 Exception completionException = null;
406 completeSelf = thisPtr.HandlePreambleAck();
408 #pragma warning suppress 56500 // [....], transferring exception to another thread
417 completionException = e;
422 thisPtr.Complete(false, completionException);
426 static void OnUpgrade(IAsyncResult result)
428 if (result.CompletedSynchronously)
433 SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState;
435 Exception completionException = null;
439 completeSelf = thisPtr.HandleUpgrade(result);
441 #pragma warning suppress 56500 // [....], transferring exception to another thread
450 completionException = e;
455 thisPtr.Complete(false, completionException);
459 static void OnFailedUpgrade(IAsyncResult result)
461 if (result.CompletedSynchronously)
466 SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState;
468 Exception completionException = null;
471 ConnectionUpgradeHelper.EndDecodeFramingFault(result);
473 #pragma warning suppress 56500 // [....], transferring exception to another thread
481 completionException = e;
484 thisPtr.Complete(false, completionException);
489 class ClientSingletonConnectionReader : SingletonConnectionReader
491 StreamedConnectionPoolHelper connectionPoolHelper;
493 public ClientSingletonConnectionReader(IConnection connection, StreamedConnectionPoolHelper connectionPoolHelper,
494 IConnectionOrientedTransportFactorySettings settings)
495 : base(connection, 0, 0, connectionPoolHelper.RemoteSecurity, settings, null)
497 this.connectionPoolHelper = connectionPoolHelper;
500 protected override long StreamPosition
502 get { return connectionPoolHelper.Decoder.StreamPosition; }
505 protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof)
509 int bytesRead = connectionPoolHelper.Decoder.Decode(buffer, offset, size);
516 switch (connectionPoolHelper.Decoder.CurrentState)
518 case ClientFramingDecoderState.EnvelopeStart:
519 // we're at the envelope
522 case ClientFramingDecoderState.End:
531 protected override void OnClose(TimeSpan timeout)
533 connectionPoolHelper.Close(timeout);
537 class StreamedFramingRequest : IRequest
539 StreamedFramingRequestChannel channel;
540 StreamedConnectionPoolHelper connectionPoolHelper;
541 IConnection connection;
543 public StreamedFramingRequest(StreamedFramingRequestChannel channel)
545 this.channel = channel;
546 this.connectionPoolHelper = new StreamedConnectionPoolHelper(channel);
549 public void SendRequest(Message message, TimeSpan timeout)
551 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
555 this.connection = connectionPoolHelper.EstablishConnection(timeoutHelper.RemainingTime());
557 ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, message, false);
559 bool success = false;
562 StreamingConnectionHelper.WriteMessage(message, this.connection, true, channel.settings, ref timeoutHelper);
569 connectionPoolHelper.Abort();
573 catch (TimeoutException exception)
575 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
576 new TimeoutException(SR.GetString(SR.TimeoutOnRequest, timeout), exception));
580 public Message WaitForReply(TimeSpan timeout)
582 ClientSingletonConnectionReader connectionReader = new ClientSingletonConnectionReader(
583 connection, connectionPoolHelper, channel.settings);
585 connectionReader.DoneSending(TimeSpan.Zero); // we still need to receive
586 Message message = connectionReader.Receive(timeout);
590 ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, message, false);
598 this.connectionPoolHelper.Abort();
601 public void Abort(RequestChannel requestChannel)
606 public void Fault(RequestChannel requestChannel)
611 public void OnReleaseRequest()
616 class StreamedFramingAsyncRequest : AsyncResult, IAsyncRequest
618 StreamedFramingRequestChannel channel;
619 IConnection connection;
620 StreamedConnectionPoolHelper connectionPoolHelper;
622 Message replyMessage;
623 TimeoutHelper timeoutHelper;
624 static AsyncCallback onEstablishConnection = Fx.ThunkCallback(new AsyncCallback(OnEstablishConnection));
625 static AsyncCallback onWriteMessage = Fx.ThunkCallback(new AsyncCallback(OnWriteMessage));
626 static AsyncCallback onReceiveReply = Fx.ThunkCallback(new AsyncCallback(OnReceiveReply));
627 ClientSingletonConnectionReader connectionReader;
629 public StreamedFramingAsyncRequest(StreamedFramingRequestChannel channel, AsyncCallback callback, object state)
630 : base(callback, state)
632 this.channel = channel;
633 this.connectionPoolHelper = new StreamedConnectionPoolHelper(channel);
636 public void BeginSendRequest(Message message, TimeSpan timeout)
638 this.timeoutHelper = new TimeoutHelper(timeout);
639 this.message = message;
641 bool completeSelf = false;
642 bool success = false;
647 IAsyncResult result = connectionPoolHelper.BeginEstablishConnection(timeoutHelper.RemainingTime(), onEstablishConnection, this);
648 if (result.CompletedSynchronously)
650 completeSelf = HandleEstablishConnection(result);
653 catch (TimeoutException exception)
655 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
656 new TimeoutException(SR.GetString(SR.TimeoutOnRequest, timeout), exception));
675 bool HandleEstablishConnection(IAsyncResult result)
677 this.connection = connectionPoolHelper.EndEstablishConnection(result);
679 ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, this.message, false);
681 IAsyncResult writeResult = StreamingConnectionHelper.BeginWriteMessage(this.message, this.connection, true, this.channel.settings, ref timeoutHelper, onWriteMessage, this);
682 if (!writeResult.CompletedSynchronously)
687 return HandleWriteMessage(writeResult);
694 AsyncResult.End<StreamedFramingAsyncRequest>(this);
696 catch (TimeoutException exception)
698 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
699 new TimeoutException(SR.GetString(SR.TimeoutOnRequest, this.timeoutHelper.OriginalTimeout), exception));
704 public void Abort(RequestChannel requestChannel)
709 public void Fault(RequestChannel requestChannel)
716 connectionPoolHelper.Abort();
719 bool HandleWriteMessage(IAsyncResult result)
721 // write out the streamed message
722 StreamingConnectionHelper.EndWriteMessage(result);
724 connectionReader = new ClientSingletonConnectionReader(connection, connectionPoolHelper, channel.settings);
725 connectionReader.DoneSending(TimeSpan.Zero); // we still need to receive
727 IAsyncResult receiveResult = connectionReader.BeginReceive(timeoutHelper.RemainingTime(), onReceiveReply, this);
729 if (!receiveResult.CompletedSynchronously)
734 return CompleteReceiveReply(receiveResult);
737 bool CompleteReceiveReply(IAsyncResult result)
739 this.replyMessage = connectionReader.EndReceive(result);
741 if (this.replyMessage != null)
743 ChannelBindingUtility.TryAddToMessage(this.channel.channelBindingToken, this.replyMessage, false);
749 static void OnEstablishConnection(IAsyncResult result)
751 if (result.CompletedSynchronously)
756 StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState;
758 Exception completionException = null;
760 bool throwing = true;
763 completeSelf = thisPtr.HandleEstablishConnection(result);
766 #pragma warning suppress 56500 // [....], transferring exception to another thread
775 completionException = e;
787 thisPtr.Complete(false, completionException);
791 static void OnWriteMessage(IAsyncResult result)
793 if (result.CompletedSynchronously)
798 StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState;
800 Exception completionException = null;
802 bool throwing = true;
805 completeSelf = thisPtr.HandleWriteMessage(result);
808 #pragma warning suppress 56500 // [....], transferring exception to another thread
817 completionException = e;
829 thisPtr.Complete(false, completionException);
833 static void OnReceiveReply(IAsyncResult result)
835 StreamedFramingAsyncRequest thisPtr = (StreamedFramingAsyncRequest)result.AsyncState;
837 Exception completionException = null;
839 bool throwing = true;
842 completeSelf = thisPtr.CompleteReceiveReply(result);
845 #pragma warning suppress 56500 // [....], transferring exception to another thread
854 completionException = e;
866 thisPtr.Complete(false, completionException);
870 public void OnReleaseRequest()