Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.ServiceModel / System / ServiceModel / Channels / OneWayChannelListener.cs
1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //------------------------------------------------------------
4
5 namespace System.ServiceModel.Channels
6 {
7     using System.Collections.Generic;
8     using System.Diagnostics;
9     using System.Runtime;
10     using System.ServiceModel;
11     using System.ServiceModel.Diagnostics;
12     using System.Threading;
13     using System.Runtime.Diagnostics;
14     using System.ServiceModel.Diagnostics.Application;
15
16     /// <summary>
17     /// Wraps an IChannelListener<IReplyChannel> into an IChannelListener<IInputChannel>
18     /// </summary>
19     class ReplyOneWayChannelListener
20         : LayeredChannelListener<IInputChannel>
21     {
22         IChannelListener<IReplyChannel> innerChannelListener;
23         bool packetRoutable;
24
25         public ReplyOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
26             : base(context.Binding, context.BuildInnerChannelListener<IReplyChannel>())
27         {
28             this.packetRoutable = bindingElement.PacketRoutable;
29         }
30
31         protected override void OnOpening()
32         {
33             this.innerChannelListener = (IChannelListener<IReplyChannel>)this.InnerChannelListener;
34             base.OnOpening();
35         }
36
37         protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
38         {
39             IReplyChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout);
40             return WrapInnerChannel(innerChannel);
41         }
42
43         protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
44         {
45             return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
46         }
47
48         protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
49         {
50             IReplyChannel innerChannel = this.innerChannelListener.EndAcceptChannel(result);
51             return WrapInnerChannel(innerChannel);
52         }
53
54         protected override bool OnWaitForChannel(TimeSpan timeout)
55         {
56             return this.innerChannelListener.WaitForChannel(timeout);
57         }
58
59         protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
60         {
61             return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
62         }
63
64         protected override bool OnEndWaitForChannel(IAsyncResult result)
65         {
66             return this.innerChannelListener.EndWaitForChannel(result);
67         }
68
69         IInputChannel WrapInnerChannel(IReplyChannel innerChannel)
70         {
71             if (innerChannel == null)
72             {
73                 return null;
74             }
75             else
76             {
77                 return new ReplyOneWayInputChannel(this, innerChannel);
78             }
79         }
80
81         class ReplyOneWayInputChannel : LayeredChannel<IReplyChannel>, IInputChannel
82         {
83             bool validateHeader;
84
85             public ReplyOneWayInputChannel(ReplyOneWayChannelListener listener, IReplyChannel innerChannel)
86                 : base(listener, innerChannel)
87             {
88                 this.validateHeader = listener.packetRoutable;
89             }
90
91             public EndpointAddress LocalAddress
92             {
93                 get { return this.InnerChannel.LocalAddress; }
94             }
95
96             Message ProcessContext(RequestContext context, TimeSpan timeout)
97             {
98                 if (context == null)
99                 {
100                     return null;
101                 }
102
103                 bool replySuccess = false;
104                 Message result = null;
105                 try
106                 {
107                     // validate that the request message contains our expected header
108                     result = context.RequestMessage;
109                     result.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
110
111                     if (this.validateHeader)
112                     {
113                         PacketRoutableHeader.ValidateMessage(result);
114                     }
115
116                     try
117                     {
118                         TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
119                         context.Reply(null, timeoutHelper.RemainingTime());
120                         replySuccess = true;
121                     }
122                     catch (CommunicationException e)
123                     {
124                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
125                     }
126                     catch (TimeoutException e)
127                     {
128                         if (TD.SendTimeoutIsEnabled())
129                         {
130                             TD.SendTimeout(e.Message);
131                         }
132                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
133                     }
134                 }
135                 finally
136                 {
137                     if (!replySuccess)
138                     {
139                         context.Abort();
140                         if (result != null)
141                         {
142                             result.Close();
143                             result = null;
144                         }
145                     }
146                 }
147
148                 return result;
149             }
150
151             public Message Receive()
152             {
153                 return this.Receive(this.DefaultReceiveTimeout);
154             }
155
156             public Message Receive(TimeSpan timeout)
157             {
158                 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
159                 RequestContext context = InnerChannel.ReceiveRequest(timeoutHelper.RemainingTime());
160                 return ProcessContext(context, timeoutHelper.RemainingTime());
161             }
162
163             public IAsyncResult BeginReceive(AsyncCallback callback, object state)
164             {
165                 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
166             }
167
168             public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
169             {
170                 return new ReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
171             }
172
173             public Message EndReceive(IAsyncResult result)
174             {
175                 return ReceiveAsyncResult.End(result);
176             }
177
178             public bool TryReceive(TimeSpan timeout, out Message message)
179             {
180                 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
181                 RequestContext context;
182                 if (InnerChannel.TryReceiveRequest(timeoutHelper.RemainingTime(), out context))
183                 {
184                     message = ProcessContext(context, timeoutHelper.RemainingTime());
185                     return true;
186                 }
187                 else
188                 {
189                     message = null;
190                     return false;
191                 }
192             }
193
194             public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
195             {
196                 return new TryReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
197             }
198
199             public bool EndTryReceive(IAsyncResult result, out Message message)
200             {
201                 return TryReceiveAsyncResult.End(result, out message);
202             }
203
204             public bool WaitForMessage(TimeSpan timeout)
205             {
206                 return InnerChannel.WaitForRequest(timeout);
207             }
208
209             public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
210             {
211                 return InnerChannel.BeginWaitForRequest(timeout, callback, state);
212             }
213
214             public bool EndWaitForMessage(IAsyncResult result)
215             {
216                 return InnerChannel.EndWaitForRequest(result);
217             }
218
219             class TryReceiveAsyncResult : ReceiveAsyncResultBase
220             {
221                 bool tryResult;
222
223                 public TryReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
224                     AsyncCallback callback, object state)
225                     : base(innerChannel, timeout, validateHeader, callback, state)
226                 {
227                 }
228
229                 public static bool End(IAsyncResult result, out Message message)
230                 {
231                     TryReceiveAsyncResult thisPtr = AsyncResult.End<TryReceiveAsyncResult>(result);
232                     message = thisPtr.Message;
233                     return thisPtr.tryResult;
234                 }
235
236                 protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
237                 {
238                     return InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
239                 }
240
241                 protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
242                 {
243                     RequestContext context;
244                     this.tryResult = InnerChannel.EndTryReceiveRequest(result, out context);
245                     return context;
246                 }
247             }
248
249             class ReceiveAsyncResult : ReceiveAsyncResultBase
250             {
251                 public ReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
252                     AsyncCallback callback, object state)
253                     : base(innerChannel, timeout, validateHeader, callback, state)
254                 {
255                 }
256
257                 public static Message End(IAsyncResult result)
258                 {
259                     ReceiveAsyncResult thisPtr = AsyncResult.End<ReceiveAsyncResult>(result);
260                     return thisPtr.Message;
261                 }
262
263                 protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
264                 {
265                     return InnerChannel.BeginReceiveRequest(timeout, callback, state);
266                 }
267
268                 protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
269                 {
270                     return InnerChannel.EndReceiveRequest(result);
271                 }
272             }
273
274             abstract class ReceiveAsyncResultBase : AsyncResult
275             {
276                 IReplyChannel innerChannel;
277                 RequestContext context;
278                 Message message;
279                 TimeoutHelper timeoutHelper;
280                 bool validateHeader;
281                 static AsyncCallback onReceiveRequest = Fx.ThunkCallback(new AsyncCallback(OnReceiveRequest));
282                 static AsyncCallback onReply = Fx.ThunkCallback(new AsyncCallback(OnReply));
283
284                 protected ReceiveAsyncResultBase(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
285                     AsyncCallback callback, object state)
286                     : base(callback, state)
287                 {
288                     this.innerChannel = innerChannel;
289                     this.timeoutHelper = new TimeoutHelper(timeout);
290                     this.validateHeader = validateHeader;
291                     IAsyncResult result = this.OnBeginReceiveRequest(timeoutHelper.RemainingTime(), onReceiveRequest, this);
292                     if (!result.CompletedSynchronously)
293                     {
294                         return;
295                     }
296
297                     if (HandleReceiveRequestComplete(result))
298                     {
299                         base.Complete(true);
300                     }
301                 }
302
303                 protected IReplyChannel InnerChannel
304                 {
305                     get
306                     {
307                         return this.innerChannel;
308                     }
309                 }
310
311                 protected Message Message
312                 {
313                     get
314                     {
315                         return this.message;
316                     }
317                 }
318
319                 protected abstract IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
320                 protected abstract RequestContext OnEndReceiveRequest(IAsyncResult result);
321
322                 bool HandleReplyComplete(IAsyncResult result)
323                 {
324                     bool abortContext = true;
325                     try
326                     {
327                         context.EndReply(result);
328                         abortContext = false;
329                     }
330                     catch (CommunicationException e)
331                     {
332                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
333                     }
334                     catch (TimeoutException e)
335                     {
336                         if (TD.SendTimeoutIsEnabled())
337                         {
338                             TD.SendTimeout(e.Message);
339                         }
340                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
341                     }
342                     finally
343                     {
344                         if (abortContext)
345                         {
346                             context.Abort();
347                         }
348                     }
349
350                     return true;
351                 }
352
353                 bool HandleReceiveRequestComplete(IAsyncResult result)
354                 {
355                     this.context = this.OnEndReceiveRequest(result);
356                     if (this.context == null)
357                     {
358                         return true;
359                     }
360
361                     bool replySuccess = false;
362                     IAsyncResult replyResult = null;
363                     try
364                     {
365                         this.message = context.RequestMessage;
366                         this.message.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
367
368                         if (validateHeader)
369                         {
370                             PacketRoutableHeader.ValidateMessage(this.message);
371                         }
372                         try
373                         {
374                             replyResult = context.BeginReply(null, timeoutHelper.RemainingTime(), onReply, this);
375                             replySuccess = true;
376                         }
377                         catch (CommunicationException e)
378                         {
379                             DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
380                         }
381                         catch (TimeoutException e)
382                         {
383                             if (TD.SendTimeoutIsEnabled())
384                             {
385                                 TD.SendTimeout(e.Message);
386                             }
387                             DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
388                         }
389                     }
390                     finally
391                     {
392                         if (!replySuccess)
393                         {
394                             this.context.Abort();
395                             if (this.message != null)
396                             {
397                                 this.message.Close();
398                                 this.message = null;
399                             }
400                         }
401                     }
402
403                     if (replyResult == null)
404                     {
405                         return true;
406                     }
407                     else if (replyResult.CompletedSynchronously)
408                     {
409                         return HandleReplyComplete(replyResult);
410                     }
411                     else
412                     {
413                         return false;
414                     }
415                 }
416
417                 static void OnReceiveRequest(IAsyncResult result)
418                 {
419                     if (result.CompletedSynchronously)
420                     {
421                         return;
422                     }
423
424                     ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
425
426                     Exception completionException = null;
427                     bool completeSelf;
428                     try
429                     {
430                         completeSelf = thisPtr.HandleReceiveRequestComplete(result);
431                     }
432 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
433                     catch (Exception e)
434                     {
435                         if (Fx.IsFatal(e))
436                         {
437                             throw;
438                         }
439                         completeSelf = true;
440                         completionException = e;
441                     }
442
443                     if (completeSelf)
444                     {
445                         thisPtr.Complete(false, completionException);
446                     }
447                 }
448
449                 static void OnReply(IAsyncResult result)
450                 {
451                     if (result.CompletedSynchronously)
452                     {
453                         return;
454                     }
455
456                     ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
457
458                     Exception completionException = null;
459                     bool completeSelf;
460                     try
461                     {
462                         completeSelf = thisPtr.HandleReplyComplete(result);
463                     }
464 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
465                     catch (Exception e)
466                     {
467                         if (Fx.IsFatal(e))
468                         {
469                             throw;
470                         }
471                         completeSelf = true;
472                         completionException = e;
473                     }
474
475                     if (completeSelf)
476                     {
477                         thisPtr.Complete(false, completionException);
478                     }
479                 }
480             }
481         }
482     }
483
484     // <summary>
485     // Wraps an IChannelListener<IDuplexChannel> into an IChannelListener<IInputChannel>
486     // </summary>
487     class DuplexOneWayChannelListener
488         : LayeredChannelListener<IInputChannel>
489     {
490         IChannelListener<IDuplexChannel> innerChannelListener;
491         bool packetRoutable;
492
493         public DuplexOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
494             : base(context.Binding, context.BuildInnerChannelListener<IDuplexChannel>())
495         {
496             this.packetRoutable = bindingElement.PacketRoutable;
497         }
498
499         protected override void OnOpening()
500         {
501             this.innerChannelListener = (IChannelListener<IDuplexChannel>)this.InnerChannelListener;
502             base.OnOpening();
503         }
504
505         protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
506         {
507             IDuplexChannel channel = this.innerChannelListener.AcceptChannel(timeout);
508             return WrapInnerChannel(channel);
509         }
510
511         protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
512         {
513             return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
514         }
515
516         protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
517         {
518             return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
519         }
520
521         protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
522         {
523             IDuplexChannel channel = this.innerChannelListener.EndAcceptChannel(result);
524             return WrapInnerChannel(channel);
525         }
526
527         protected override bool OnEndWaitForChannel(IAsyncResult result)
528         {
529             return this.innerChannelListener.EndWaitForChannel(result);
530         }
531
532         protected override bool OnWaitForChannel(TimeSpan timeout)
533         {
534             return this.innerChannelListener.WaitForChannel(timeout);
535         }
536
537         IInputChannel WrapInnerChannel(IDuplexChannel innerChannel)
538         {
539             if (innerChannel == null)
540             {
541                 return null;
542             }
543             else
544             {
545                 return new DuplexOneWayInputChannel(this, innerChannel);
546             }
547         }
548
549         class DuplexOneWayInputChannel : LayeredChannel<IDuplexChannel>, IInputChannel
550         {
551             bool validateHeader;
552
553             public DuplexOneWayInputChannel(DuplexOneWayChannelListener listener, IDuplexChannel innerChannel)
554                 : base(listener, innerChannel)
555             {
556                 this.validateHeader = listener.packetRoutable;
557             }
558
559             public EndpointAddress LocalAddress
560             {
561                 get { return this.InnerChannel.LocalAddress; }
562             }
563
564             public IAsyncResult BeginReceive(AsyncCallback callback, object state)
565             {
566                 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
567             }
568
569             public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
570             {
571                 return this.InnerChannel.BeginReceive(timeout, callback, state);
572             }
573
574             public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
575             {
576                 return this.InnerChannel.BeginTryReceive(timeout, callback, state);
577             }
578
579             public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
580             {
581                 return this.InnerChannel.BeginWaitForMessage(timeout, callback, state);
582             }
583
584             public Message EndReceive(IAsyncResult result)
585             {
586                 Message message = this.InnerChannel.EndReceive(result);
587                 return ValidateMessage(message);
588             }
589
590             public bool EndTryReceive(IAsyncResult result, out Message message)
591             {
592                 bool success = this.InnerChannel.EndTryReceive(result, out message);
593                 message = ValidateMessage(message);
594                 return success;
595             }
596
597             public bool EndWaitForMessage(IAsyncResult result)
598             {
599                 return this.InnerChannel.EndWaitForMessage(result);
600             }
601
602             public Message Receive()
603             {
604                 return this.Receive(this.DefaultReceiveTimeout);
605             }
606
607             public Message Receive(TimeSpan timeout)
608             {
609                 Message result = this.InnerChannel.Receive(timeout);
610                 return ValidateMessage(result);
611             }
612
613             public bool TryReceive(TimeSpan timeout, out Message message)
614             {
615                 bool success = this.InnerChannel.TryReceive(timeout, out message);
616                 message = ValidateMessage(message);
617                 return success;
618             }
619
620             public bool WaitForMessage(TimeSpan timeout)
621             {
622                 return this.InnerChannel.WaitForMessage(timeout);
623             }
624
625             Message ValidateMessage(Message message)
626             {
627                 if (this.validateHeader && message != null)
628                 {
629                     PacketRoutableHeader.ValidateMessage(message);
630                 }
631                 return message;
632             }
633         }
634     }
635
636     /// <summary>
637     /// Wraps an IChannelListener<IDuplexSessionChannel> into an IChannelListener<IInputChannel>
638     /// </summary>
639     class DuplexSessionOneWayChannelListener
640         : DelegatingChannelListener<IInputChannel>
641     {
642         IChannelListener<IDuplexSessionChannel> innerChannelListener;
643         DuplexSessionOneWayInputChannelAcceptor inputChannelAcceptor;
644         bool packetRoutable;
645         int maxAcceptedChannels;
646         bool acceptPending;
647         int activeChannels;
648         TimeSpan idleTimeout;
649         static AsyncCallback onAcceptInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnAcceptInnerChannel));
650         AsyncCallback onOpenInnerChannel;
651         EventHandler onInnerChannelClosed;
652         Action onExceptionDequeued;
653         Action<object> handleAcceptCallback;
654         bool ownsInnerListener;
655         object acceptLock;
656
657         public DuplexSessionOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
658             : base(true, context.Binding, context.BuildInnerChannelListener<IDuplexSessionChannel>())
659         {
660             this.acceptLock = new object();
661             this.inputChannelAcceptor = new DuplexSessionOneWayInputChannelAcceptor(this);
662             this.packetRoutable = bindingElement.PacketRoutable;
663             this.maxAcceptedChannels = bindingElement.MaxAcceptedChannels;
664             this.Acceptor = this.inputChannelAcceptor;
665             this.idleTimeout = bindingElement.ChannelPoolSettings.IdleTimeout;
666             this.onOpenInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnOpenInnerChannel));
667             this.ownsInnerListener = true;
668             this.onInnerChannelClosed = new EventHandler(OnInnerChannelClosed);
669         }
670
671         bool IsAcceptNecessary
672         {
673             get
674             {
675                 return !acceptPending
676                     && (activeChannels < maxAcceptedChannels)
677                     && (this.innerChannelListener.State == CommunicationState.Opened);
678             }
679         }
680
681         protected override void OnOpening()
682         {
683             this.innerChannelListener = (IChannelListener<IDuplexSessionChannel>)this.InnerChannelListener;
684             this.inputChannelAcceptor.TransferInnerChannelListener(this.innerChannelListener); // acceptor now owns the lifetime
685             this.ownsInnerListener = false;
686             base.OnOpening();
687         }
688
689         protected override void OnOpened()
690         {
691             base.OnOpened();
692             ActionItem.Schedule(new Action<object>(AcceptLoop), null);
693         }
694
695         protected override void OnAbort()
696         {
697             base.OnAbort();
698             if (this.ownsInnerListener && this.innerChannelListener != null) // Open didn't complete
699             {
700                 this.innerChannelListener.Abort();
701             }
702         }
703
704         void AcceptLoop(object state)
705         {
706             AcceptLoop(null);
707         }
708
709         // we need to kick off an accept (and possibly process a completion as well)
710         void AcceptLoop(IAsyncResult pendingResult)
711         {
712             IDuplexSessionChannel pendingChannel = null;
713
714             if (pendingResult != null)
715             {
716                 if (!ProcessEndAccept(pendingResult, out pendingChannel))
717                 {
718                     return;
719                 }
720                 pendingResult = null;
721             }
722
723             lock (acceptLock)
724             {
725                 while (IsAcceptNecessary)
726                 {
727                     Exception exceptionToEnqueue = null;
728                     try
729                     {
730                         IAsyncResult result = null;
731
732                         try
733                         {
734                             result = this.innerChannelListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptInnerChannel, this);
735                         }
736                         catch (CommunicationException e)
737                         {
738                             DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
739                             continue;
740                         }
741
742                         acceptPending = true;
743                         if (!result.CompletedSynchronously)
744                         {
745                             break;
746                         }
747
748                         if (this.handleAcceptCallback == null)
749                         {
750                             this.handleAcceptCallback = new Action<object>(HandleAcceptCallback);
751                         }
752
753                         if (pendingChannel != null)
754                         {
755                             // don't starve our completed Accept
756                             ActionItem.Schedule(handleAcceptCallback, pendingChannel);
757                             pendingChannel = null;
758                         }
759
760                         IDuplexSessionChannel channel = null;
761                         if (ProcessEndAccept(result, out channel))
762                         {
763                             if (channel != null)
764                             {
765                                 ActionItem.Schedule(handleAcceptCallback, channel);
766                             }
767                         }
768                         else
769                         {
770                             return;
771                         }
772                     }
773 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
774                     catch (Exception e)
775                     {
776                         if (Fx.IsFatal(e))
777                         {
778                             throw;
779                         }
780
781                         exceptionToEnqueue = e;
782                     }
783
784                     if (exceptionToEnqueue != null)
785                     {
786                         this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null, false);
787                     }
788                 }
789             }
790
791             if (pendingChannel != null)
792             {
793                 HandleAcceptComplete(pendingChannel);
794             }
795         }
796
797         // return true if the loop should continue
798         bool ProcessEndAccept(IAsyncResult result, out IDuplexSessionChannel channel)
799         {
800             channel = null;
801             Exception exceptionToEnqueue = null;
802             bool success = false;
803             try
804             {
805                 channel = innerChannelListener.EndAcceptChannel(result);
806                 success = true;
807             }
808             catch (CommunicationException e)
809             {
810                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
811             }
812 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
813             catch (Exception e)
814             {
815                 if (Fx.IsFatal(e))
816                 {
817                     throw;
818                 }
819
820                 exceptionToEnqueue = e;
821             }
822
823             if (success)
824             {
825                 if (channel != null)
826                 {
827                     channel.Closed += this.onInnerChannelClosed;
828                     bool traceMaxInboundChannels = false;
829                     lock (acceptLock)
830                     {
831                         this.acceptPending = false;
832                         activeChannels++;
833                         if (activeChannels >= maxAcceptedChannels)
834                         {
835                             traceMaxInboundChannels = true;
836                         }
837                     }
838
839                     if (DiagnosticUtility.ShouldTraceWarning)
840                     {
841                         if (traceMaxInboundChannels)
842                         {
843                             TraceUtility.TraceEvent(TraceEventType.Warning,
844                                 TraceCode.MaxAcceptedChannelsReached,
845                                 SR.GetString(SR.TraceCodeMaxAcceptedChannelsReached),
846                                 new StringTraceRecord("MaxAcceptedChannels", maxAcceptedChannels.ToString(System.Globalization.CultureInfo.InvariantCulture)),
847                                 this,
848                                 null);
849                         }
850                     }
851
852                 }
853                 else
854                 {
855                     // we're at EOF. close up the Acceptor and break out of our loop
856                     this.inputChannelAcceptor.Close();
857                     return false;
858                 }
859             }
860             else if (exceptionToEnqueue != null)
861             {
862                 // see what the state of the inner listener is. If it's still open, don't block the accept loop
863                 bool canDispatchOnThisThread = (innerChannelListener.State != CommunicationState.Opened);
864                 if (this.onExceptionDequeued == null)
865                 {
866                     this.onExceptionDequeued = new Action(OnExceptionDequeued);
867                 }
868                 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, this.onExceptionDequeued, canDispatchOnThisThread);
869             }
870             else
871             {
872                 lock (acceptLock)
873                 {
874                     this.acceptPending = false;
875                 }
876             }
877
878             return true;
879         }
880
881         void OnExceptionDequeued()
882         {
883             lock (acceptLock)
884             {
885                 this.acceptPending = false;
886             }
887             AcceptLoop(null);
888         }
889
890         static void OnAcceptInnerChannel(IAsyncResult result)
891         {
892             if (result.CompletedSynchronously)
893             {
894                 return;
895             }
896
897             DuplexSessionOneWayChannelListener thisPtr = (DuplexSessionOneWayChannelListener)result.AsyncState;
898             thisPtr.AcceptLoop(result);
899         }
900
901         void HandleAcceptCallback(object state)
902         {
903             this.HandleAcceptComplete((IDuplexSessionChannel)state);
904         }
905
906         void OnInnerChannelClosed(object sender, EventArgs e)
907         {
908             // Reduce our quota and kick off an accept
909             IDuplexSessionChannel channel = (IDuplexSessionChannel)sender;
910             channel.Closed -= this.onInnerChannelClosed;
911
912             lock (acceptLock)
913             {
914                 activeChannels--;
915             }
916             this.AcceptLoop(null);
917         }
918
919         void HandleAcceptComplete(IDuplexSessionChannel channel)
920         {
921             Exception exceptionToEnqueue = null;
922             bool success = false;
923
924             this.inputChannelAcceptor.PrepareChannel(channel);
925             IAsyncResult openResult = null;
926             try
927             {
928                 openResult = channel.BeginOpen(this.idleTimeout, onOpenInnerChannel, channel);
929                 success = true;
930             }
931             catch (CommunicationException e) // ---- CommunicationException
932             {
933                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
934             }
935             catch (TimeoutException e)
936             {
937                 if (TD.OpenTimeoutIsEnabled())
938                 {
939                     TD.OpenTimeout(e.Message);
940                 }
941                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
942             }
943 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
944             catch (Exception e)
945             {
946                 if (Fx.IsFatal(e))
947                 {
948                     throw;
949                 }
950
951                 exceptionToEnqueue = e;
952             }
953             finally
954             {
955                 if (!success && channel != null)
956                 {
957                     channel.Abort();
958                 }
959             }
960
961             if (success)
962             {
963                 if (openResult.CompletedSynchronously)
964                 {
965                     CompleteOpen(channel, openResult);
966                 }
967             }
968             else
969             {
970                 if (exceptionToEnqueue != null)
971                 {
972                     this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
973                 }
974             }
975         }
976
977         void OnOpenInnerChannel(IAsyncResult result)
978         {
979             if (result.CompletedSynchronously)
980             {
981                 return;
982             }
983
984             IDuplexSessionChannel channel = (IDuplexSessionChannel)result.AsyncState;
985             CompleteOpen(channel, result);
986         }
987
988         // open channel and start receiving messages
989         void CompleteOpen(IDuplexSessionChannel channel, IAsyncResult result)
990         {
991             Exception exceptionToEnqueue = null;
992             bool success = false;
993             try
994             {
995                 channel.EndOpen(result);
996                 success = true;
997             }
998             catch (CommunicationException e)
999             {
1000                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1001             }
1002             catch (TimeoutException e)
1003             {
1004                 if (TD.OpenTimeoutIsEnabled())
1005                 {
1006                     TD.OpenTimeout(e.Message);
1007                 }
1008                 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1009             }
1010 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1011             catch (Exception e)
1012             {
1013                 if (Fx.IsFatal(e))
1014                 {
1015                     throw;
1016                 }
1017
1018                 exceptionToEnqueue = e;
1019             }
1020             finally
1021             {
1022                 if (!success)
1023                 {
1024                     channel.Abort();
1025                 }
1026             }
1027
1028             if (success)
1029             {
1030                 this.inputChannelAcceptor.AcceptInnerChannel(this, channel);
1031             }
1032             else if (exceptionToEnqueue != null)
1033             {
1034                 this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
1035             }
1036         }
1037
1038         class DuplexSessionOneWayInputChannelAcceptor : InputChannelAcceptor
1039         {
1040             ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
1041             IChannelListener<IDuplexSessionChannel> innerChannelListener;
1042
1043             public DuplexSessionOneWayInputChannelAcceptor(DuplexSessionOneWayChannelListener listener)
1044                 : base(listener)
1045             {
1046                 this.receivers = new ChannelTracker<IDuplexSessionChannel, ChannelReceiver>();
1047             }
1048
1049             public void TransferInnerChannelListener(IChannelListener<IDuplexSessionChannel> innerChannelListener)
1050             {
1051                 Fx.Assert(this.innerChannelListener == null, "innerChannelListener must be null prior to transfer");
1052                 bool abortListener = false;
1053                 lock (ThisLock)
1054                 {
1055                     this.innerChannelListener = innerChannelListener;
1056                     if (this.State == CommunicationState.Closing || this.State == CommunicationState.Closed)
1057                     {
1058                         // abort happened before we completed the transfer
1059                         abortListener = true;
1060                     }
1061                 }
1062
1063                 if (abortListener)
1064                 {
1065                     innerChannelListener.Abort();
1066                 }
1067             }
1068
1069             public void AcceptInnerChannel(DuplexSessionOneWayChannelListener listener, IDuplexSessionChannel channel)
1070             {
1071                 ChannelReceiver channelReceiver = new ChannelReceiver(listener, channel);
1072                 this.receivers.Add(channel, channelReceiver);
1073                 channelReceiver.StartReceiving();
1074             }
1075
1076             public void PrepareChannel(IDuplexSessionChannel channel)
1077             {
1078                 this.receivers.PrepareChannel(channel);
1079             }
1080
1081             protected override InputChannel OnCreateChannel()
1082             {
1083                 return new DuplexSessionOneWayInputChannel(this.ChannelManager, null);
1084             }
1085
1086             protected override void OnOpen(TimeSpan timeout)
1087             {
1088                 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1089                 base.OnOpen(timeoutHelper.RemainingTime());
1090                 this.receivers.Open(timeoutHelper.RemainingTime());
1091                 this.innerChannelListener.Open(timeoutHelper.RemainingTime());
1092             }
1093
1094             protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
1095             {
1096                 return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.receivers, this.innerChannelListener);
1097             }
1098
1099             protected override void OnEndOpen(IAsyncResult result)
1100             {
1101                 ChainedOpenAsyncResult.End(result);
1102             }
1103
1104             protected override void OnAbort()
1105             {
1106                 base.OnAbort();
1107                 if (!TransferReceivers())
1108                 {
1109                     this.receivers.Abort();
1110                     if (this.innerChannelListener != null)
1111                     {
1112                         this.innerChannelListener.Abort();
1113                     }
1114                 }
1115             }
1116
1117             protected override void OnClose(TimeSpan timeout)
1118             {
1119                 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1120                 base.OnClose(timeoutHelper.RemainingTime());
1121                 if (!TransferReceivers())
1122                 {
1123                     this.receivers.Close(timeoutHelper.RemainingTime());
1124                     this.innerChannelListener.Close(timeoutHelper.RemainingTime());
1125                 }
1126             }
1127
1128             protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
1129             {
1130                 List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
1131                 if (!TransferReceivers())
1132                 {
1133                     objectsToClose.Add(this.receivers);
1134                     objectsToClose.Add(this.innerChannelListener);
1135                 }
1136
1137                 return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
1138             }
1139
1140             protected override void OnEndClose(IAsyncResult result)
1141             {
1142                 ChainedCloseAsyncResult.End(result);
1143             }
1144
1145             // used to decouple our channel and listener lifetimes
1146             bool TransferReceivers()
1147             {
1148                 DuplexSessionOneWayInputChannel singletonChannel = (DuplexSessionOneWayInputChannel)base.GetCurrentChannel();
1149                 if (singletonChannel == null)
1150                 {
1151                     return false;
1152                 }
1153                 else
1154                 {
1155                     return singletonChannel.TransferReceivers(this.receivers, this.innerChannelListener);
1156                 }
1157             }
1158
1159             class DuplexSessionOneWayInputChannel : InputChannel
1160             {
1161                 ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
1162                 IChannelListener<IDuplexSessionChannel> innerChannelListener;
1163
1164                 public DuplexSessionOneWayInputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
1165                     : base(channelManager, localAddress)
1166                 {
1167                 }
1168
1169                 public bool TransferReceivers(ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers,
1170                     IChannelListener<IDuplexSessionChannel> innerChannelListener)
1171                 {
1172                     lock (ThisLock)
1173                     {
1174                         if (this.State != CommunicationState.Opened)
1175                         {
1176                             return false;
1177                         }
1178
1179                         this.receivers = receivers;
1180                         this.innerChannelListener = innerChannelListener;
1181                         return true;
1182                     }
1183                 }
1184
1185                 protected override void OnAbort()
1186                 {
1187                     if (this.receivers != null)
1188                     {
1189                         Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
1190                         this.receivers.Abort();
1191                         this.innerChannelListener.Abort();
1192                     }
1193                     base.OnAbort();
1194                 }
1195
1196                 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
1197                 {
1198                     List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
1199                     if (this.receivers != null)
1200                     {
1201                         objectsToClose.Add(this.receivers);
1202                         objectsToClose.Add(this.innerChannelListener);
1203                     }
1204
1205                     return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
1206                 }
1207
1208                 protected override void OnEndClose(IAsyncResult result)
1209                 {
1210                     ChainedCloseAsyncResult.End(result);
1211                 }
1212
1213                 protected override void OnClose(TimeSpan timeout)
1214                 {
1215                     TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1216                     if (this.receivers != null)
1217                     {
1218                         Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
1219                         this.receivers.Close(timeoutHelper.RemainingTime());
1220                         this.innerChannelListener.Close(timeoutHelper.RemainingTime());
1221                     }
1222                     base.OnClose(timeoutHelper.RemainingTime());
1223                 }
1224
1225             }
1226         }
1227
1228
1229         // given an inner channel, pulls messages off of it and enqueues them into the upper channel
1230         class ChannelReceiver
1231         {
1232             Action onMessageDequeued;
1233             static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
1234             DuplexSessionOneWayInputChannelAcceptor acceptor;
1235             IDuplexSessionChannel channel;
1236             TimeSpan idleTimeout;
1237             static Action<object> startReceivingCallback;
1238             Action<object> onStartReceiveLater;
1239             Action<object> onDispatchItemsLater;
1240             bool validateHeader;
1241
1242             public ChannelReceiver(DuplexSessionOneWayChannelListener parent, IDuplexSessionChannel channel)
1243             {
1244                 this.channel = channel;
1245                 this.acceptor = parent.inputChannelAcceptor;
1246                 this.idleTimeout = parent.idleTimeout;
1247                 this.validateHeader = parent.packetRoutable;
1248                 this.onMessageDequeued = new Action(OnMessageDequeued);
1249             }
1250
1251             void StartReceivingCallback(object state)
1252             {
1253                 ((ChannelReceiver)state).StartReceiving();
1254             }
1255
1256             public void StartReceiving()
1257             {
1258                 Exception exceptionToEnqueue = null;
1259
1260                 while (true)
1261                 {
1262                     if (channel.State != CommunicationState.Opened)
1263                     {
1264                         channel.Abort();
1265                         break;
1266                     }
1267
1268                     IAsyncResult result = null;
1269                     try
1270                     {
1271                         result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
1272                     }
1273                     catch (CommunicationException e)
1274                     {
1275                         DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1276                     }
1277 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1278                     catch (Exception e)
1279                     {
1280                         if (Fx.IsFatal(e))
1281                         {
1282                             throw;
1283                         }
1284
1285                         exceptionToEnqueue = e;
1286                         break;
1287                     }
1288
1289                     if (result != null)
1290                     {
1291                         if (!result.CompletedSynchronously)
1292                         {
1293                             break;
1294                         }
1295
1296                         bool dispatch;
1297                         bool continueLoop = OnCompleteReceive(result, out dispatch);
1298                         if (dispatch)
1299                         {
1300                             Dispatch();
1301                         }
1302                         if (!continueLoop)
1303                         {
1304                             break;
1305                         }
1306                     }
1307                 }
1308
1309                 if (exceptionToEnqueue != null)
1310                 {
1311                     this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued);
1312                 }
1313             }
1314
1315             bool EnqueueMessage(Message message)
1316             {
1317                 if (this.validateHeader)
1318                 {
1319                     if (!PacketRoutableHeader.TryValidateMessage(message))
1320                     {
1321                         this.channel.Abort();
1322                         message.Close();
1323                         return false;
1324                     }
1325                     else
1326                     {
1327                         this.validateHeader = false; // only validate the first message on a session
1328                     }
1329                 }
1330
1331                 return this.acceptor.EnqueueWithoutDispatch(message, this.onMessageDequeued);
1332             }
1333
1334             void OnStartReceiveLater(object state)
1335             {
1336                 StartReceiving();
1337             }
1338
1339             void OnDispatchItemsLater(object state)
1340             {
1341                 Dispatch();
1342             }
1343
1344             void Dispatch()
1345             {
1346                 this.acceptor.DispatchItems();
1347             }
1348
1349             // returns true if the Receive Loop should continue (or be started if it's not running)
1350             bool OnCompleteReceive(IAsyncResult result, out bool dispatchLater)
1351             {
1352                 Exception exceptionToEnqueue = null;
1353                 Message message = null;
1354                 bool startLoop = false;
1355                 dispatchLater = false;
1356
1357                 try
1358                 {
1359                     if (!this.channel.EndTryReceive(result, out message))
1360                     {
1361                         this.channel.Abort(); // we've hit our IdleTimeout
1362                     }
1363                     else if (message == null)
1364                     {
1365                         this.channel.Close(); // read EOF, close our half of the session
1366                     }
1367                 }
1368                 catch (CommunicationException e)
1369                 {
1370                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1371                     startLoop = (this.channel.State == CommunicationState.Opened);
1372                 }
1373                 catch (TimeoutException e)
1374                 {
1375                     if (TD.CloseTimeoutIsEnabled())
1376                     {
1377                         TD.CloseTimeout(e.Message);
1378                     }
1379                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1380                     startLoop = (this.channel.State == CommunicationState.Opened);
1381                 }
1382 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1383                 catch (Exception e)
1384                 {
1385                     if (Fx.IsFatal(e))
1386                     {
1387                         throw;
1388                     }
1389
1390                     exceptionToEnqueue = e;
1391                 }
1392
1393                 if (message != null)
1394                 {
1395                     dispatchLater = EnqueueMessage(message);
1396                 }
1397                 else if (exceptionToEnqueue != null)
1398                 {
1399                     dispatchLater = this.acceptor.EnqueueWithoutDispatch(exceptionToEnqueue, this.onMessageDequeued);
1400                 }
1401
1402                 return startLoop;
1403             }
1404
1405             void OnMessageDequeued()
1406             {
1407                 IAsyncResult result = null;
1408                 Exception exceptionToEnqueue = null;
1409
1410                 try
1411                 {
1412                     result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
1413                 }
1414                 catch (CommunicationException e)
1415                 {
1416                     DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1417                 }
1418 #pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
1419                 catch (Exception e)
1420                 {
1421                     if (Fx.IsFatal(e))
1422                     {
1423                         throw;
1424                     }
1425
1426                     exceptionToEnqueue = e;
1427                 }
1428
1429                 if (result != null)
1430                 {
1431                     if (result.CompletedSynchronously)
1432                     {
1433                         bool dispatchLater;
1434
1435                         if (OnCompleteReceive(result, out dispatchLater))
1436                         {
1437                             if (onStartReceiveLater == null)
1438                             {
1439                                 onStartReceiveLater = new Action<object>(OnStartReceiveLater);
1440                             }
1441                             ActionItem.Schedule(onStartReceiveLater, null);
1442                         }
1443
1444                         if (dispatchLater)
1445                         {
1446                             if (onDispatchItemsLater == null)
1447                             {
1448                                 onDispatchItemsLater = new Action<object>(OnDispatchItemsLater);
1449                             }
1450                             ActionItem.Schedule(onDispatchItemsLater, null);
1451                         }
1452                     }
1453                 }
1454                 else if (exceptionToEnqueue != null)
1455                 {
1456                     this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued, false);
1457                 }
1458                 else // need to kickoff a new loop 
1459                 {
1460                     if (this.channel.State == CommunicationState.Opened)
1461                     {
1462                         if (startReceivingCallback == null)
1463                         {
1464                             startReceivingCallback = new Action<object>(StartReceivingCallback);
1465                         }
1466
1467                         ActionItem.Schedule(startReceivingCallback, this);
1468                     }
1469                 }
1470             }
1471
1472             static void OnReceive(IAsyncResult result)
1473             {
1474                 if (result.CompletedSynchronously)
1475                 {
1476                     return;
1477                 }
1478
1479                 ChannelReceiver thisPtr = (ChannelReceiver)result.AsyncState;
1480                 bool dispatch;
1481                 if (thisPtr.OnCompleteReceive(result, out dispatch))
1482                 {
1483                     thisPtr.StartReceiving();
1484                 }
1485
1486                 if (dispatch)
1487                 {
1488                     thisPtr.Dispatch();
1489                 }
1490             }
1491         }
1492     }
1493 }