1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System.ServiceModel.Channels
7 using System.Collections.Generic;
8 using System.Diagnostics;
10 using System.ServiceModel;
11 using System.ServiceModel.Diagnostics;
12 using System.Threading;
13 using System.Runtime.Diagnostics;
14 using System.ServiceModel.Diagnostics.Application;
17 /// Wraps an IChannelListener<IReplyChannel> into an IChannelListener<IInputChannel>
19 class ReplyOneWayChannelListener
20 : LayeredChannelListener<IInputChannel>
22 IChannelListener<IReplyChannel> innerChannelListener;
25 public ReplyOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
26 : base(context.Binding, context.BuildInnerChannelListener<IReplyChannel>())
28 this.packetRoutable = bindingElement.PacketRoutable;
31 protected override void OnOpening()
33 this.innerChannelListener = (IChannelListener<IReplyChannel>)this.InnerChannelListener;
37 protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
39 IReplyChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout);
40 return WrapInnerChannel(innerChannel);
43 protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
45 return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
48 protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
50 IReplyChannel innerChannel = this.innerChannelListener.EndAcceptChannel(result);
51 return WrapInnerChannel(innerChannel);
54 protected override bool OnWaitForChannel(TimeSpan timeout)
56 return this.innerChannelListener.WaitForChannel(timeout);
59 protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
61 return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
64 protected override bool OnEndWaitForChannel(IAsyncResult result)
66 return this.innerChannelListener.EndWaitForChannel(result);
69 IInputChannel WrapInnerChannel(IReplyChannel innerChannel)
71 if (innerChannel == null)
77 return new ReplyOneWayInputChannel(this, innerChannel);
81 class ReplyOneWayInputChannel : LayeredChannel<IReplyChannel>, IInputChannel
85 public ReplyOneWayInputChannel(ReplyOneWayChannelListener listener, IReplyChannel innerChannel)
86 : base(listener, innerChannel)
88 this.validateHeader = listener.packetRoutable;
91 public EndpointAddress LocalAddress
93 get { return this.InnerChannel.LocalAddress; }
96 Message ProcessContext(RequestContext context, TimeSpan timeout)
103 bool replySuccess = false;
104 Message result = null;
107 // validate that the request message contains our expected header
108 result = context.RequestMessage;
109 result.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
111 if (this.validateHeader)
113 PacketRoutableHeader.ValidateMessage(result);
118 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
119 context.Reply(null, timeoutHelper.RemainingTime());
122 catch (CommunicationException e)
124 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
126 catch (TimeoutException e)
128 if (TD.SendTimeoutIsEnabled())
130 TD.SendTimeout(e.Message);
132 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
151 public Message Receive()
153 return this.Receive(this.DefaultReceiveTimeout);
156 public Message Receive(TimeSpan timeout)
158 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
159 RequestContext context = InnerChannel.ReceiveRequest(timeoutHelper.RemainingTime());
160 return ProcessContext(context, timeoutHelper.RemainingTime());
163 public IAsyncResult BeginReceive(AsyncCallback callback, object state)
165 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
168 public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
170 return new ReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
173 public Message EndReceive(IAsyncResult result)
175 return ReceiveAsyncResult.End(result);
178 public bool TryReceive(TimeSpan timeout, out Message message)
180 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
181 RequestContext context;
182 if (InnerChannel.TryReceiveRequest(timeoutHelper.RemainingTime(), out context))
184 message = ProcessContext(context, timeoutHelper.RemainingTime());
194 public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
196 return new TryReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
199 public bool EndTryReceive(IAsyncResult result, out Message message)
201 return TryReceiveAsyncResult.End(result, out message);
204 public bool WaitForMessage(TimeSpan timeout)
206 return InnerChannel.WaitForRequest(timeout);
209 public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
211 return InnerChannel.BeginWaitForRequest(timeout, callback, state);
214 public bool EndWaitForMessage(IAsyncResult result)
216 return InnerChannel.EndWaitForRequest(result);
219 class TryReceiveAsyncResult : ReceiveAsyncResultBase
223 public TryReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
224 AsyncCallback callback, object state)
225 : base(innerChannel, timeout, validateHeader, callback, state)
229 public static bool End(IAsyncResult result, out Message message)
231 TryReceiveAsyncResult thisPtr = AsyncResult.End<TryReceiveAsyncResult>(result);
232 message = thisPtr.Message;
233 return thisPtr.tryResult;
236 protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
238 return InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
241 protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
243 RequestContext context;
244 this.tryResult = InnerChannel.EndTryReceiveRequest(result, out context);
249 class ReceiveAsyncResult : ReceiveAsyncResultBase
251 public ReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
252 AsyncCallback callback, object state)
253 : base(innerChannel, timeout, validateHeader, callback, state)
257 public static Message End(IAsyncResult result)
259 ReceiveAsyncResult thisPtr = AsyncResult.End<ReceiveAsyncResult>(result);
260 return thisPtr.Message;
263 protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
265 return InnerChannel.BeginReceiveRequest(timeout, callback, state);
268 protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
270 return InnerChannel.EndReceiveRequest(result);
274 abstract class ReceiveAsyncResultBase : AsyncResult
276 IReplyChannel innerChannel;
277 RequestContext context;
279 TimeoutHelper timeoutHelper;
281 static AsyncCallback onReceiveRequest = Fx.ThunkCallback(new AsyncCallback(OnReceiveRequest));
282 static AsyncCallback onReply = Fx.ThunkCallback(new AsyncCallback(OnReply));
284 protected ReceiveAsyncResultBase(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
285 AsyncCallback callback, object state)
286 : base(callback, state)
288 this.innerChannel = innerChannel;
289 this.timeoutHelper = new TimeoutHelper(timeout);
290 this.validateHeader = validateHeader;
291 IAsyncResult result = this.OnBeginReceiveRequest(timeoutHelper.RemainingTime(), onReceiveRequest, this);
292 if (!result.CompletedSynchronously)
297 if (HandleReceiveRequestComplete(result))
303 protected IReplyChannel InnerChannel
307 return this.innerChannel;
311 protected Message Message
319 protected abstract IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
320 protected abstract RequestContext OnEndReceiveRequest(IAsyncResult result);
322 bool HandleReplyComplete(IAsyncResult result)
324 bool abortContext = true;
327 context.EndReply(result);
328 abortContext = false;
330 catch (CommunicationException e)
332 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
334 catch (TimeoutException e)
336 if (TD.SendTimeoutIsEnabled())
338 TD.SendTimeout(e.Message);
340 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
353 bool HandleReceiveRequestComplete(IAsyncResult result)
355 this.context = this.OnEndReceiveRequest(result);
356 if (this.context == null)
361 bool replySuccess = false;
362 IAsyncResult replyResult = null;
365 this.message = context.RequestMessage;
366 this.message.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
370 PacketRoutableHeader.ValidateMessage(this.message);
374 replyResult = context.BeginReply(null, timeoutHelper.RemainingTime(), onReply, this);
377 catch (CommunicationException e)
379 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
381 catch (TimeoutException e)
383 if (TD.SendTimeoutIsEnabled())
385 TD.SendTimeout(e.Message);
387 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
394 this.context.Abort();
395 if (this.message != null)
397 this.message.Close();
403 if (replyResult == null)
407 else if (replyResult.CompletedSynchronously)
409 return HandleReplyComplete(replyResult);
417 static void OnReceiveRequest(IAsyncResult result)
419 if (result.CompletedSynchronously)
424 ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
426 Exception completionException = null;
430 completeSelf = thisPtr.HandleReceiveRequestComplete(result);
432 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
440 completionException = e;
445 thisPtr.Complete(false, completionException);
449 static void OnReply(IAsyncResult result)
451 if (result.CompletedSynchronously)
456 ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
458 Exception completionException = null;
462 completeSelf = thisPtr.HandleReplyComplete(result);
464 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
472 completionException = e;
477 thisPtr.Complete(false, completionException);
485 // Wraps an IChannelListener<IDuplexChannel> into an IChannelListener<IInputChannel>
487 class DuplexOneWayChannelListener
488 : LayeredChannelListener<IInputChannel>
490 IChannelListener<IDuplexChannel> innerChannelListener;
493 public DuplexOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
494 : base(context.Binding, context.BuildInnerChannelListener<IDuplexChannel>())
496 this.packetRoutable = bindingElement.PacketRoutable;
499 protected override void OnOpening()
501 this.innerChannelListener = (IChannelListener<IDuplexChannel>)this.InnerChannelListener;
505 protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
507 IDuplexChannel channel = this.innerChannelListener.AcceptChannel(timeout);
508 return WrapInnerChannel(channel);
511 protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
513 return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
516 protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
518 return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
521 protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
523 IDuplexChannel channel = this.innerChannelListener.EndAcceptChannel(result);
524 return WrapInnerChannel(channel);
527 protected override bool OnEndWaitForChannel(IAsyncResult result)
529 return this.innerChannelListener.EndWaitForChannel(result);
532 protected override bool OnWaitForChannel(TimeSpan timeout)
534 return this.innerChannelListener.WaitForChannel(timeout);
537 IInputChannel WrapInnerChannel(IDuplexChannel innerChannel)
539 if (innerChannel == null)
545 return new DuplexOneWayInputChannel(this, innerChannel);
549 class DuplexOneWayInputChannel : LayeredChannel<IDuplexChannel>, IInputChannel
553 public DuplexOneWayInputChannel(DuplexOneWayChannelListener listener, IDuplexChannel innerChannel)
554 : base(listener, innerChannel)
556 this.validateHeader = listener.packetRoutable;
559 public EndpointAddress LocalAddress
561 get { return this.InnerChannel.LocalAddress; }
564 public IAsyncResult BeginReceive(AsyncCallback callback, object state)
566 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
569 public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
571 return this.InnerChannel.BeginReceive(timeout, callback, state);
574 public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
576 return this.InnerChannel.BeginTryReceive(timeout, callback, state);
579 public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
581 return this.InnerChannel.BeginWaitForMessage(timeout, callback, state);
584 public Message EndReceive(IAsyncResult result)
586 Message message = this.InnerChannel.EndReceive(result);
587 return ValidateMessage(message);
590 public bool EndTryReceive(IAsyncResult result, out Message message)
592 bool success = this.InnerChannel.EndTryReceive(result, out message);
593 message = ValidateMessage(message);
597 public bool EndWaitForMessage(IAsyncResult result)
599 return this.InnerChannel.EndWaitForMessage(result);
602 public Message Receive()
604 return this.Receive(this.DefaultReceiveTimeout);
607 public Message Receive(TimeSpan timeout)
609 Message result = this.InnerChannel.Receive(timeout);
610 return ValidateMessage(result);
613 public bool TryReceive(TimeSpan timeout, out Message message)
615 bool success = this.InnerChannel.TryReceive(timeout, out message);
616 message = ValidateMessage(message);
620 public bool WaitForMessage(TimeSpan timeout)
622 return this.InnerChannel.WaitForMessage(timeout);
625 Message ValidateMessage(Message message)
627 if (this.validateHeader && message != null)
629 PacketRoutableHeader.ValidateMessage(message);
637 /// Wraps an IChannelListener<IDuplexSessionChannel> into an IChannelListener<IInputChannel>
639 class DuplexSessionOneWayChannelListener
640 : DelegatingChannelListener<IInputChannel>
642 IChannelListener<IDuplexSessionChannel> innerChannelListener;
643 DuplexSessionOneWayInputChannelAcceptor inputChannelAcceptor;
645 int maxAcceptedChannels;
648 TimeSpan idleTimeout;
649 static AsyncCallback onAcceptInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnAcceptInnerChannel));
650 AsyncCallback onOpenInnerChannel;
651 EventHandler onInnerChannelClosed;
652 Action onExceptionDequeued;
653 Action<object> handleAcceptCallback;
654 bool ownsInnerListener;
657 public DuplexSessionOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
658 : base(true, context.Binding, context.BuildInnerChannelListener<IDuplexSessionChannel>())
660 this.acceptLock = new object();
661 this.inputChannelAcceptor = new DuplexSessionOneWayInputChannelAcceptor(this);
662 this.packetRoutable = bindingElement.PacketRoutable;
663 this.maxAcceptedChannels = bindingElement.MaxAcceptedChannels;
664 this.Acceptor = this.inputChannelAcceptor;
665 this.idleTimeout = bindingElement.ChannelPoolSettings.IdleTimeout;
666 this.onOpenInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnOpenInnerChannel));
667 this.ownsInnerListener = true;
668 this.onInnerChannelClosed = new EventHandler(OnInnerChannelClosed);
671 bool IsAcceptNecessary
675 return !acceptPending
676 && (activeChannels < maxAcceptedChannels)
677 && (this.innerChannelListener.State == CommunicationState.Opened);
681 protected override void OnOpening()
683 this.innerChannelListener = (IChannelListener<IDuplexSessionChannel>)this.InnerChannelListener;
684 this.inputChannelAcceptor.TransferInnerChannelListener(this.innerChannelListener); // acceptor now owns the lifetime
685 this.ownsInnerListener = false;
689 protected override void OnOpened()
692 ActionItem.Schedule(new Action<object>(AcceptLoop), null);
695 protected override void OnAbort()
698 if (this.ownsInnerListener && this.innerChannelListener != null) // Open didn't complete
700 this.innerChannelListener.Abort();
704 void AcceptLoop(object state)
709 // we need to kick off an accept (and possibly process a completion as well)
710 void AcceptLoop(IAsyncResult pendingResult)
712 IDuplexSessionChannel pendingChannel = null;
714 if (pendingResult != null)
716 if (!ProcessEndAccept(pendingResult, out pendingChannel))
720 pendingResult = null;
725 while (IsAcceptNecessary)
727 Exception exceptionToEnqueue = null;
730 IAsyncResult result = null;
734 result = this.innerChannelListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptInnerChannel, this);
736 catch (CommunicationException e)
738 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
742 acceptPending = true;
743 if (!result.CompletedSynchronously)
748 if (this.handleAcceptCallback == null)
750 this.handleAcceptCallback = new Action<object>(HandleAcceptCallback);
753 if (pendingChannel != null)
755 // don't starve our completed Accept
756 ActionItem.Schedule(handleAcceptCallback, pendingChannel);
757 pendingChannel = null;
760 IDuplexSessionChannel channel = null;
761 if (ProcessEndAccept(result, out channel))
765 ActionItem.Schedule(handleAcceptCallback, channel);
773 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
781 exceptionToEnqueue = e;
784 if (exceptionToEnqueue != null)
786 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null, false);
791 if (pendingChannel != null)
793 HandleAcceptComplete(pendingChannel);
797 // return true if the loop should continue
798 bool ProcessEndAccept(IAsyncResult result, out IDuplexSessionChannel channel)
801 Exception exceptionToEnqueue = null;
802 bool success = false;
805 channel = innerChannelListener.EndAcceptChannel(result);
808 catch (CommunicationException e)
810 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
812 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
820 exceptionToEnqueue = e;
827 channel.Closed += this.onInnerChannelClosed;
828 bool traceMaxInboundChannels = false;
831 this.acceptPending = false;
833 if (activeChannels >= maxAcceptedChannels)
835 traceMaxInboundChannels = true;
839 if (DiagnosticUtility.ShouldTraceWarning)
841 if (traceMaxInboundChannels)
843 TraceUtility.TraceEvent(TraceEventType.Warning,
844 TraceCode.MaxAcceptedChannelsReached,
845 SR.GetString(SR.TraceCodeMaxAcceptedChannelsReached),
846 new StringTraceRecord("MaxAcceptedChannels", maxAcceptedChannels.ToString(System.Globalization.CultureInfo.InvariantCulture)),
855 // we're at EOF. close up the Acceptor and break out of our loop
856 this.inputChannelAcceptor.Close();
860 else if (exceptionToEnqueue != null)
862 // see what the state of the inner listener is. If it's still open, don't block the accept loop
863 bool canDispatchOnThisThread = (innerChannelListener.State != CommunicationState.Opened);
864 if (this.onExceptionDequeued == null)
866 this.onExceptionDequeued = new Action(OnExceptionDequeued);
868 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, this.onExceptionDequeued, canDispatchOnThisThread);
874 this.acceptPending = false;
881 void OnExceptionDequeued()
885 this.acceptPending = false;
890 static void OnAcceptInnerChannel(IAsyncResult result)
892 if (result.CompletedSynchronously)
897 DuplexSessionOneWayChannelListener thisPtr = (DuplexSessionOneWayChannelListener)result.AsyncState;
898 thisPtr.AcceptLoop(result);
901 void HandleAcceptCallback(object state)
903 this.HandleAcceptComplete((IDuplexSessionChannel)state);
906 void OnInnerChannelClosed(object sender, EventArgs e)
908 // Reduce our quota and kick off an accept
909 IDuplexSessionChannel channel = (IDuplexSessionChannel)sender;
910 channel.Closed -= this.onInnerChannelClosed;
916 this.AcceptLoop(null);
919 void HandleAcceptComplete(IDuplexSessionChannel channel)
921 Exception exceptionToEnqueue = null;
922 bool success = false;
924 this.inputChannelAcceptor.PrepareChannel(channel);
925 IAsyncResult openResult = null;
928 openResult = channel.BeginOpen(this.idleTimeout, onOpenInnerChannel, channel);
931 catch (CommunicationException e) // ---- CommunicationException
933 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
935 catch (TimeoutException e)
937 if (TD.OpenTimeoutIsEnabled())
939 TD.OpenTimeout(e.Message);
941 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
943 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
951 exceptionToEnqueue = e;
955 if (!success && channel != null)
963 if (openResult.CompletedSynchronously)
965 CompleteOpen(channel, openResult);
970 if (exceptionToEnqueue != null)
972 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
977 void OnOpenInnerChannel(IAsyncResult result)
979 if (result.CompletedSynchronously)
984 IDuplexSessionChannel channel = (IDuplexSessionChannel)result.AsyncState;
985 CompleteOpen(channel, result);
988 // open channel and start receiving messages
989 void CompleteOpen(IDuplexSessionChannel channel, IAsyncResult result)
991 Exception exceptionToEnqueue = null;
992 bool success = false;
995 channel.EndOpen(result);
998 catch (CommunicationException e)
1000 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1002 catch (TimeoutException e)
1004 if (TD.OpenTimeoutIsEnabled())
1006 TD.OpenTimeout(e.Message);
1008 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1010 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1018 exceptionToEnqueue = e;
1030 this.inputChannelAcceptor.AcceptInnerChannel(this, channel);
1032 else if (exceptionToEnqueue != null)
1034 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
1038 class DuplexSessionOneWayInputChannelAcceptor : InputChannelAcceptor
1040 ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
1041 IChannelListener<IDuplexSessionChannel> innerChannelListener;
1043 public DuplexSessionOneWayInputChannelAcceptor(DuplexSessionOneWayChannelListener listener)
1046 this.receivers = new ChannelTracker<IDuplexSessionChannel, ChannelReceiver>();
1049 public void TransferInnerChannelListener(IChannelListener<IDuplexSessionChannel> innerChannelListener)
1051 Fx.Assert(this.innerChannelListener == null, "innerChannelListener must be null prior to transfer");
1052 bool abortListener = false;
1055 this.innerChannelListener = innerChannelListener;
1056 if (this.State == CommunicationState.Closing || this.State == CommunicationState.Closed)
1058 // abort happened before we completed the transfer
1059 abortListener = true;
1065 innerChannelListener.Abort();
1069 public void AcceptInnerChannel(DuplexSessionOneWayChannelListener listener, IDuplexSessionChannel channel)
1071 ChannelReceiver channelReceiver = new ChannelReceiver(listener, channel);
1072 this.receivers.Add(channel, channelReceiver);
1073 channelReceiver.StartReceiving();
1076 public void PrepareChannel(IDuplexSessionChannel channel)
1078 this.receivers.PrepareChannel(channel);
1081 protected override InputChannel OnCreateChannel()
1083 return new DuplexSessionOneWayInputChannel(this.ChannelManager, null);
1086 protected override void OnOpen(TimeSpan timeout)
1088 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1089 base.OnOpen(timeoutHelper.RemainingTime());
1090 this.receivers.Open(timeoutHelper.RemainingTime());
1091 this.innerChannelListener.Open(timeoutHelper.RemainingTime());
1094 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
1096 return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.receivers, this.innerChannelListener);
1099 protected override void OnEndOpen(IAsyncResult result)
1101 ChainedOpenAsyncResult.End(result);
1104 protected override void OnAbort()
1107 if (!TransferReceivers())
1109 this.receivers.Abort();
1110 if (this.innerChannelListener != null)
1112 this.innerChannelListener.Abort();
1117 protected override void OnClose(TimeSpan timeout)
1119 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1120 base.OnClose(timeoutHelper.RemainingTime());
1121 if (!TransferReceivers())
1123 this.receivers.Close(timeoutHelper.RemainingTime());
1124 this.innerChannelListener.Close(timeoutHelper.RemainingTime());
1128 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
1130 List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
1131 if (!TransferReceivers())
1133 objectsToClose.Add(this.receivers);
1134 objectsToClose.Add(this.innerChannelListener);
1137 return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
1140 protected override void OnEndClose(IAsyncResult result)
1142 ChainedCloseAsyncResult.End(result);
1145 // used to decouple our channel and listener lifetimes
1146 bool TransferReceivers()
1148 DuplexSessionOneWayInputChannel singletonChannel = (DuplexSessionOneWayInputChannel)base.GetCurrentChannel();
1149 if (singletonChannel == null)
1155 return singletonChannel.TransferReceivers(this.receivers, this.innerChannelListener);
1159 class DuplexSessionOneWayInputChannel : InputChannel
1161 ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
1162 IChannelListener<IDuplexSessionChannel> innerChannelListener;
1164 public DuplexSessionOneWayInputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
1165 : base(channelManager, localAddress)
1169 public bool TransferReceivers(ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers,
1170 IChannelListener<IDuplexSessionChannel> innerChannelListener)
1174 if (this.State != CommunicationState.Opened)
1179 this.receivers = receivers;
1180 this.innerChannelListener = innerChannelListener;
1185 protected override void OnAbort()
1187 if (this.receivers != null)
1189 Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
1190 this.receivers.Abort();
1191 this.innerChannelListener.Abort();
1196 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
1198 List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
1199 if (this.receivers != null)
1201 objectsToClose.Add(this.receivers);
1202 objectsToClose.Add(this.innerChannelListener);
1205 return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
1208 protected override void OnEndClose(IAsyncResult result)
1210 ChainedCloseAsyncResult.End(result);
1213 protected override void OnClose(TimeSpan timeout)
1215 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1216 if (this.receivers != null)
1218 Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
1219 this.receivers.Close(timeoutHelper.RemainingTime());
1220 this.innerChannelListener.Close(timeoutHelper.RemainingTime());
1222 base.OnClose(timeoutHelper.RemainingTime());
1229 // given an inner channel, pulls messages off of it and enqueues them into the upper channel
1230 class ChannelReceiver
1232 Action onMessageDequeued;
1233 static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
1234 DuplexSessionOneWayInputChannelAcceptor acceptor;
1235 IDuplexSessionChannel channel;
1236 TimeSpan idleTimeout;
1237 static Action<object> startReceivingCallback;
1238 Action<object> onStartReceiveLater;
1239 Action<object> onDispatchItemsLater;
1240 bool validateHeader;
1242 public ChannelReceiver(DuplexSessionOneWayChannelListener parent, IDuplexSessionChannel channel)
1244 this.channel = channel;
1245 this.acceptor = parent.inputChannelAcceptor;
1246 this.idleTimeout = parent.idleTimeout;
1247 this.validateHeader = parent.packetRoutable;
1248 this.onMessageDequeued = new Action(OnMessageDequeued);
1251 void StartReceivingCallback(object state)
1253 ((ChannelReceiver)state).StartReceiving();
1256 public void StartReceiving()
1258 Exception exceptionToEnqueue = null;
1262 if (channel.State != CommunicationState.Opened)
1268 IAsyncResult result = null;
1271 result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
1273 catch (CommunicationException e)
1275 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1277 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1285 exceptionToEnqueue = e;
1291 if (!result.CompletedSynchronously)
1297 bool continueLoop = OnCompleteReceive(result, out dispatch);
1309 if (exceptionToEnqueue != null)
1311 this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued);
1315 bool EnqueueMessage(Message message)
1317 if (this.validateHeader)
1319 if (!PacketRoutableHeader.TryValidateMessage(message))
1321 this.channel.Abort();
1327 this.validateHeader = false; // only validate the first message on a session
1331 return this.acceptor.EnqueueWithoutDispatch(message, this.onMessageDequeued);
1334 void OnStartReceiveLater(object state)
1339 void OnDispatchItemsLater(object state)
1346 this.acceptor.DispatchItems();
1349 // returns true if the Receive Loop should continue (or be started if it's not running)
1350 bool OnCompleteReceive(IAsyncResult result, out bool dispatchLater)
1352 Exception exceptionToEnqueue = null;
1353 Message message = null;
1354 bool startLoop = false;
1355 dispatchLater = false;
1359 if (!this.channel.EndTryReceive(result, out message))
1361 this.channel.Abort(); // we've hit our IdleTimeout
1363 else if (message == null)
1365 this.channel.Close(); // read EOF, close our half of the session
1368 catch (CommunicationException e)
1370 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1371 startLoop = (this.channel.State == CommunicationState.Opened);
1373 catch (TimeoutException e)
1375 if (TD.CloseTimeoutIsEnabled())
1377 TD.CloseTimeout(e.Message);
1379 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1380 startLoop = (this.channel.State == CommunicationState.Opened);
1382 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1390 exceptionToEnqueue = e;
1393 if (message != null)
1395 dispatchLater = EnqueueMessage(message);
1397 else if (exceptionToEnqueue != null)
1399 dispatchLater = this.acceptor.EnqueueWithoutDispatch(exceptionToEnqueue, this.onMessageDequeued);
1405 void OnMessageDequeued()
1407 IAsyncResult result = null;
1408 Exception exceptionToEnqueue = null;
1412 result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
1414 catch (CommunicationException e)
1416 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1418 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1426 exceptionToEnqueue = e;
1431 if (result.CompletedSynchronously)
1435 if (OnCompleteReceive(result, out dispatchLater))
1437 if (onStartReceiveLater == null)
1439 onStartReceiveLater = new Action<object>(OnStartReceiveLater);
1441 ActionItem.Schedule(onStartReceiveLater, null);
1446 if (onDispatchItemsLater == null)
1448 onDispatchItemsLater = new Action<object>(OnDispatchItemsLater);
1450 ActionItem.Schedule(onDispatchItemsLater, null);
1454 else if (exceptionToEnqueue != null)
1456 this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued, false);
1458 else // need to kickoff a new loop
1460 if (this.channel.State == CommunicationState.Opened)
1462 if (startReceivingCallback == null)
1464 startReceivingCallback = new Action<object>(StartReceivingCallback);
1467 ActionItem.Schedule(startReceivingCallback, this);
1472 static void OnReceive(IAsyncResult result)
1474 if (result.CompletedSynchronously)
1479 ChannelReceiver thisPtr = (ChannelReceiver)result.AsyncState;
1481 if (thisPtr.OnCompleteReceive(result, out dispatch))
1483 thisPtr.StartReceiving();