Update Reference Sources to .NET Framework 4.6.1
[mono.git] / mcs / class / referencesource / System.ServiceModel / System / ServiceModel / Dispatcher / ChannelHandler.cs
1 //-----------------------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 //-----------------------------------------------------------------------------
4
5 namespace System.ServiceModel.Dispatcher
6 {
7     using System;
8     using System.Diagnostics;
9     using System.Globalization;
10     using System.Runtime;
11     using System.Runtime.CompilerServices;
12     using System.Runtime.Diagnostics;
13     using System.ServiceModel;
14     using System.ServiceModel.Activation;
15     using System.ServiceModel.Channels;
16     using System.ServiceModel.Description;
17     using System.ServiceModel.Diagnostics;
18     using System.ServiceModel.Diagnostics.Application;
19     using System.Threading;
20     using System.Transactions;
21     using System.Xml;
22     using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager;
23
24     class ChannelHandler
25     {
26         public static readonly TimeSpan CloseAfterFaultTimeout = TimeSpan.FromSeconds(10);
27         public const string MessageBufferPropertyName = "_RequestMessageBuffer_";
28
29         readonly IChannelBinder binder;
30         readonly DuplexChannelBinder duplexBinder;
31         readonly ServiceHostBase host;
32         readonly bool incrementedActivityCountInConstructor;
33         readonly bool isCallback;
34         readonly ListenerHandler listener;
35         readonly ServiceThrottle throttle;
36         readonly bool wasChannelThrottled;
37         readonly SessionIdleManager idleManager;
38         readonly bool sendAsynchronously;
39
40         static AsyncCallback onAsyncReplyComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReplyComplete));
41         static AsyncCallback onAsyncReceiveComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReceiveComplete));
42         static Action<object> onContinueAsyncReceive = new Action<object>(ChannelHandler.OnContinueAsyncReceive);
43         static Action<object> onStartSyncMessagePump = new Action<object>(ChannelHandler.OnStartSyncMessagePump);
44         static Action<object> onStartAsyncMessagePump = new Action<object>(ChannelHandler.OnStartAsyncMessagePump);
45         static Action<object> onStartSingleTransactedBatch = new Action<object>(ChannelHandler.OnStartSingleTransactedBatch);
46         static Action<object> openAndEnsurePump = new Action<object>(ChannelHandler.OpenAndEnsurePump);
47
48         RequestInfo requestInfo;
49         ServiceChannel channel;
50         bool doneReceiving;
51         bool hasRegisterBeenCalled;
52         bool hasSession;
53         int isPumpAcquired;
54         bool isChannelTerminated;
55         bool isConcurrent;
56         bool isManualAddressing;
57         MessageVersion messageVersion;
58         ErrorHandlingReceiver receiver;
59         bool receiveSynchronously;
60         bool receiveWithTransaction;
61         RequestContext replied;
62         RequestContext requestWaitingForThrottle;
63         WrappedTransaction acceptTransaction;
64         ServiceThrottle instanceContextThrottle;
65         SharedTransactedBatchContext sharedTransactedBatchContext;
66         TransactedBatchContext transactedBatchContext;
67         bool isMainTransactedBatchHandler;
68         EventTraceActivity eventTraceActivity;
69         SessionOpenNotification sessionOpenNotification;
70         bool needToCreateSessionOpenNotificationMessage;
71         bool shouldRejectMessageWithOnOpenActionHeader;
72
73         internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceChannel channel)
74         {
75             ClientRuntime clientRuntime = channel.ClientRuntime;
76
77             this.messageVersion = messageVersion;
78             this.isManualAddressing = clientRuntime.ManualAddressing;
79             this.binder = binder;
80             this.channel = channel;
81
82             this.isConcurrent = true;
83             this.duplexBinder = binder as DuplexChannelBinder;
84             this.hasSession = binder.HasSession;
85             this.isCallback = true;
86
87             DispatchRuntime dispatchRuntime = clientRuntime.DispatchRuntime;
88             if (dispatchRuntime == null)
89             {
90                 this.receiver = new ErrorHandlingReceiver(binder, null);
91             }
92             else
93             {
94                 this.receiver = new ErrorHandlingReceiver(binder, dispatchRuntime.ChannelDispatcher);
95             }
96             this.requestInfo = new RequestInfo(this);
97
98         }
99
100         internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceThrottle throttle,
101             ListenerHandler listener, bool wasChannelThrottled, WrappedTransaction acceptTransaction, SessionIdleManager idleManager)
102         {
103             ChannelDispatcher channelDispatcher = listener.ChannelDispatcher;
104
105             this.messageVersion = messageVersion;
106             this.isManualAddressing = channelDispatcher.ManualAddressing;
107             this.binder = binder;
108             this.throttle = throttle;
109             this.listener = listener;
110             this.wasChannelThrottled = wasChannelThrottled;
111
112             this.host = listener.Host;
113             this.receiveSynchronously = channelDispatcher.ReceiveSynchronously;
114             this.sendAsynchronously = channelDispatcher.SendAsynchronously;
115             this.duplexBinder = binder as DuplexChannelBinder;
116             this.hasSession = binder.HasSession;
117             this.isConcurrent = ConcurrencyBehavior.IsConcurrent(channelDispatcher, this.hasSession);
118
119             if (channelDispatcher.MaxPendingReceives > 1)
120             {
121                 // We need to preserve order if the ChannelHandler is not concurrent.
122                 this.binder = new MultipleReceiveBinder(
123                     this.binder,
124                     channelDispatcher.MaxPendingReceives,
125                     !this.isConcurrent);
126             }
127
128             if (channelDispatcher.BufferedReceiveEnabled)
129             {
130                 this.binder = new BufferedReceiveBinder(this.binder);
131             }
132
133             this.receiver = new ErrorHandlingReceiver(this.binder, channelDispatcher);
134             this.idleManager = idleManager;
135             Fx.Assert((this.idleManager != null) == (this.binder.HasSession && this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout != TimeSpan.MaxValue), "idle manager is present only when there is a session with a finite receive timeout");
136
137             if (channelDispatcher.IsTransactedReceive && !channelDispatcher.ReceiveContextEnabled)
138             {
139                 receiveSynchronously = true;
140                 receiveWithTransaction = true;
141
142                 if (channelDispatcher.MaxTransactedBatchSize > 0)
143                 {
144                     int maxConcurrentBatches = 1;
145                     if (null != throttle && throttle.MaxConcurrentCalls > 1)
146                     {
147                         maxConcurrentBatches = throttle.MaxConcurrentCalls;
148                         foreach (EndpointDispatcher endpointDispatcher in channelDispatcher.Endpoints)
149                         {
150                             if (ConcurrencyMode.Multiple != endpointDispatcher.DispatchRuntime.ConcurrencyMode)
151                             {
152                                 maxConcurrentBatches = 1;
153                                 break;
154                             }
155                         }
156                     }
157
158                     this.sharedTransactedBatchContext = new SharedTransactedBatchContext(this, channelDispatcher, maxConcurrentBatches);
159                     this.isMainTransactedBatchHandler = true;
160                     this.throttle = null;
161                 }
162             }
163             else if (channelDispatcher.IsTransactedReceive && channelDispatcher.ReceiveContextEnabled && channelDispatcher.MaxTransactedBatchSize > 0)
164             {
165                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.IncompatibleBehaviors)));
166             }
167
168             if (this.binder.HasSession)
169             {
170                 this.sessionOpenNotification = this.binder.Channel.GetProperty<SessionOpenNotification>();
171                 this.needToCreateSessionOpenNotificationMessage = this.sessionOpenNotification != null && this.sessionOpenNotification.IsEnabled;
172             }
173
174             this.acceptTransaction = acceptTransaction;
175             this.requestInfo = new RequestInfo(this);
176
177             if (this.listener.State == CommunicationState.Opened)
178             {
179                 this.listener.ChannelDispatcher.Channels.IncrementActivityCount();
180                 this.incrementedActivityCountInConstructor = true;
181             }
182         }
183
184
185         internal ChannelHandler(ChannelHandler handler, TransactedBatchContext context)
186         {
187             this.messageVersion = handler.messageVersion;
188             this.isManualAddressing = handler.isManualAddressing;
189             this.binder = handler.binder;
190             this.listener = handler.listener;
191             this.wasChannelThrottled = handler.wasChannelThrottled;
192
193             this.host = handler.host;
194             this.receiveSynchronously = true;
195             this.receiveWithTransaction = true;
196             this.duplexBinder = handler.duplexBinder;
197             this.hasSession = handler.hasSession;
198             this.isConcurrent = handler.isConcurrent;
199             this.receiver = handler.receiver;
200
201             this.sharedTransactedBatchContext = context.Shared;
202             this.transactedBatchContext = context;
203             this.requestInfo = new RequestInfo(this);
204
205             this.sendAsynchronously = handler.sendAsynchronously;
206             this.sessionOpenNotification = handler.sessionOpenNotification;
207             this.needToCreateSessionOpenNotificationMessage = handler.needToCreateSessionOpenNotificationMessage;
208             this.shouldRejectMessageWithOnOpenActionHeader = handler.shouldRejectMessageWithOnOpenActionHeader;
209         }
210
211         internal IChannelBinder Binder
212         {
213             get { return this.binder; }
214         }
215
216         internal ServiceChannel Channel
217         {
218             get { return this.channel; }
219         }
220
221         internal bool HasRegisterBeenCalled
222         {
223             get { return this.hasRegisterBeenCalled; }
224         }
225
226         internal InstanceContext InstanceContext
227         {
228             get { return (this.channel != null) ? this.channel.InstanceContext : null; }
229         }
230
231         internal ServiceThrottle InstanceContextServiceThrottle
232         {
233             get
234             {
235                 return this.instanceContextThrottle;
236             }
237             set
238             {
239                 this.instanceContextThrottle = value;
240             }
241         }
242
243         bool IsOpen
244         {
245             get { return this.binder.Channel.State == CommunicationState.Opened; }
246         }
247
248         EndpointAddress LocalAddress
249         {
250             get
251             {
252                 if (this.binder != null)
253                 {
254                     IInputChannel input = this.binder.Channel as IInputChannel;
255                     if (input != null)
256                     {
257                         return input.LocalAddress;
258                     }
259
260                     IReplyChannel reply = this.binder.Channel as IReplyChannel;
261                     if (reply != null)
262                     {
263                         return reply.LocalAddress;
264                     }
265                 }
266
267                 return null;
268             }
269         }
270
271         object ThisLock
272         {
273             get { return this; }
274         }
275
276         EventTraceActivity EventTraceActivity
277         {
278             get
279             {
280                 if (this.eventTraceActivity == null)
281                 {
282                     this.eventTraceActivity = new EventTraceActivity();
283                 }
284                 return this.eventTraceActivity;
285             }
286         }
287
288         internal static void Register(ChannelHandler handler)
289         {
290             handler.Register();
291         }
292
293         internal static void Register(ChannelHandler handler, RequestContext request)
294         {
295             BufferedReceiveBinder bufferedBinder = handler.Binder as BufferedReceiveBinder;
296             Fx.Assert(bufferedBinder != null, "ChannelHandler.Binder is not a BufferedReceiveBinder");
297
298             bufferedBinder.InjectRequest(request);
299             handler.Register();
300         }
301
302         void Register()
303         {
304             this.hasRegisterBeenCalled = true;
305             if (this.binder.Channel.State == CommunicationState.Created)
306             {
307                 ActionItem.Schedule(openAndEnsurePump, this);
308             }
309             else
310             {
311                 this.EnsurePump();
312             }
313         }
314
315         void AsyncMessagePump()
316         {
317             IAsyncResult result = this.BeginTryReceive();
318
319             if ((result != null) && result.CompletedSynchronously)
320             {
321                 this.AsyncMessagePump(result);
322             }
323         }
324
325         void AsyncMessagePump(IAsyncResult result)
326         {
327             if (TD.ChannelReceiveStopIsEnabled())
328             {
329                 TD.ChannelReceiveStop(this.EventTraceActivity, this.GetHashCode());
330             }
331
332             for (;;)
333             {
334                 RequestContext request;
335
336                 while (!this.EndTryReceive(result, out request))
337                 {
338                     result = this.BeginTryReceive();
339
340                     if ((result == null) || !result.CompletedSynchronously)
341                     {
342                         return;
343                     }
344                 }
345
346                 if (!HandleRequest(request, null))
347                 {
348                     break;
349                 }
350
351                 if (!TryAcquirePump())
352                 {
353                     break;
354                 }
355
356                 result = this.BeginTryReceive();
357
358                 if (result == null || !result.CompletedSynchronously)
359                 {
360                     break;
361                 }
362             }
363         }
364
365         IAsyncResult BeginTryReceive()
366         {
367             this.requestInfo.Cleanup();
368
369             if (TD.ChannelReceiveStartIsEnabled())
370             {
371                 TD.ChannelReceiveStart(this.EventTraceActivity, this.GetHashCode());
372             }
373
374             this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
375             if (this.needToCreateSessionOpenNotificationMessage)
376             {
377                 return new CompletedAsyncResult(ChannelHandler.onAsyncReceiveComplete, this);
378             }
379
380             return this.receiver.BeginTryReceive(TimeSpan.MaxValue, ChannelHandler.onAsyncReceiveComplete, this);
381         }
382
383         bool DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext)
384         {
385             ServiceChannel channel = this.requestInfo.Channel;
386             EndpointDispatcher endpoint = this.requestInfo.Endpoint;
387             bool releasedPump = false;
388
389             try
390             {
391                 DispatchRuntime dispatchBehavior = this.requestInfo.DispatchRuntime;
392
393                 if (channel == null || dispatchBehavior == null)
394                 {
395                     Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.Dispatch(): (channel == null || dispatchBehavior == null)");
396                     return true;
397                 }
398
399                 MessageBuffer buffer = null;
400                 Message message;
401
402                 EventTraceActivity eventTraceActivity = TraceDispatchMessageStart(request.RequestMessage);
403                 AspNetEnvironment.Current.PrepareMessageForDispatch(request.RequestMessage);
404                 if (dispatchBehavior.PreserveMessage)
405                 {
406                     object previousBuffer = null;
407                     if (request.RequestMessage.Properties.TryGetValue(MessageBufferPropertyName, out previousBuffer))
408                     {
409                         buffer = (MessageBuffer)previousBuffer;
410                         message = buffer.CreateMessage();
411                     }
412                     else
413                     {
414                         // 
415                         buffer = request.RequestMessage.CreateBufferedCopy(int.MaxValue);
416                         message = buffer.CreateMessage();
417                     }
418                 }
419                 else
420                 {
421                     message = request.RequestMessage;
422                 }
423
424                 DispatchOperationRuntime operation = dispatchBehavior.GetOperation(ref message);
425                 if (operation == null)
426                 {
427                     Fx.Assert("ChannelHandler.Dispatch (operation == null)");
428                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "No DispatchOperationRuntime found to process message.")));
429                 }
430
431                 if (this.shouldRejectMessageWithOnOpenActionHeader && message.Headers.Action == OperationDescription.SessionOpenedAction)
432                 {
433                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxNoEndpointMatchingAddressForConnectionOpeningMessage, message.Headers.Action, "Open")));
434                 }
435
436                 if (MessageLogger.LoggingEnabled)
437                 {
438                     MessageLogger.LogMessage(ref message, (operation.IsOneWay ? MessageLoggingSource.ServiceLevelReceiveDatagram : MessageLoggingSource.ServiceLevelReceiveRequest) | MessageLoggingSource.LastChance);
439                 }
440
441                 if (operation.IsTerminating && this.hasSession)
442                 {
443                     this.isChannelTerminated = true;
444                 }
445
446                 bool hasOperationContextBeenSet;
447                 if (currentOperationContext != null)
448                 {
449                     hasOperationContextBeenSet = true;
450                     currentOperationContext.ReInit(request, message, channel);
451                 }
452                 else
453                 {
454                     hasOperationContextBeenSet = false;
455                     currentOperationContext = new OperationContext(request, message, channel, this.host);
456                 }
457
458                 if (dispatchBehavior.PreserveMessage)
459                 {
460                     currentOperationContext.IncomingMessageProperties.Add(MessageBufferPropertyName, buffer);
461                 }
462
463                 if (currentOperationContext.EndpointDispatcher == null && this.listener != null)
464                 {
465                     currentOperationContext.EndpointDispatcher = endpoint;
466                 }
467
468                 MessageRpc rpc = new MessageRpc(request, message, operation, channel, this.host,
469                     this, cleanThread, currentOperationContext, this.requestInfo.ExistingInstanceContext, eventTraceActivity);
470
471                 TraceUtility.MessageFlowAtMessageReceived(message, currentOperationContext, eventTraceActivity, true);
472
473                 rpc.TransactedBatchContext = this.transactedBatchContext;
474
475                 // passing responsibility for call throttle to MessageRpc
476                 // (MessageRpc implicitly owns this throttle once it's created)
477                 this.requestInfo.ChannelHandlerOwnsCallThrottle = false;
478                 // explicitly passing responsibility for instance throttle to MessageRpc
479                 rpc.MessageRpcOwnsInstanceContextThrottle = this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle;
480                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = false;
481
482                 // These need to happen before Dispatch but after accessing any ChannelHandler
483                 // state, because we go multi-threaded after this until we reacquire pump mutex.
484                 this.ReleasePump();
485                 releasedPump = true;
486
487                 return operation.Parent.Dispatch(ref rpc, hasOperationContextBeenSet);
488             }
489             catch (Exception e)
490             {
491                 if (Fx.IsFatal(e))
492                 {
493                     throw;
494                 }
495                 return this.HandleError(e, request, channel);
496             }
497             finally
498             {
499                 if (!releasedPump)
500                 {
501                     this.ReleasePump();
502                 }
503             }
504         }
505
506         internal void DispatchDone()
507         {
508             if (this.throttle != null)
509             {
510                 this.throttle.DeactivateCall();
511             }
512         }
513
514         RequestContext GetSessionOpenNotificationRequestContext()
515         {
516             Fx.Assert(this.sessionOpenNotification != null, "this.sessionOpenNotification should not be null.");
517             Message message = Message.CreateMessage(this.Binder.Channel.GetProperty<MessageVersion>(), OperationDescription.SessionOpenedAction);
518             Fx.Assert(this.LocalAddress != null, "this.LocalAddress should not be null.");
519             message.Headers.To = this.LocalAddress.Uri;
520             this.sessionOpenNotification.UpdateMessageProperties(message.Properties);
521             return this.Binder.CreateRequestContext(message);
522         }
523
524         bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
525         {
526             bool valid;
527             if (this.needToCreateSessionOpenNotificationMessage)
528             {
529                 this.needToCreateSessionOpenNotificationMessage = false;
530                 Fx.Assert(result is CompletedAsyncResult, "result must be CompletedAsyncResult");
531                 CompletedAsyncResult.End(result);
532                 requestContext = this.GetSessionOpenNotificationRequestContext();
533                 valid = true;
534             }
535             else
536             {
537                 valid = this.receiver.EndTryReceive(result, out requestContext);
538             }
539
540             if (valid)
541             {
542                 this.HandleReceiveComplete(requestContext);
543             }
544
545             return valid;
546         }
547
548         void EnsureChannelAndEndpoint(RequestContext request)
549         {
550             this.requestInfo.Channel = this.channel;
551
552             if (this.requestInfo.Channel == null)
553             {
554                 bool addressMatched;
555                 if (this.hasSession)
556                 {
557                     this.requestInfo.Channel = this.GetSessionChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
558                 }
559                 else
560                 {
561                     this.requestInfo.Channel = this.GetDatagramChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
562                 }
563
564                 if (this.requestInfo.Channel == null)
565                 {
566                     this.host.RaiseUnknownMessageReceived(request.RequestMessage);
567                     if (addressMatched)
568                     {
569                         this.ReplyContractFilterDidNotMatch(request);
570                     }
571                     else
572                     {
573                         this.ReplyAddressFilterDidNotMatch(request);
574                     }
575                 }
576             }
577             else
578             {
579                 this.requestInfo.Endpoint = this.requestInfo.Channel.EndpointDispatcher;
580
581                 //For sessionful contracts, the InstanceContext throttle is not copied over to the channel
582                 //as we create the channel before acquiring the lock
583                 if (this.InstanceContextServiceThrottle != null && this.requestInfo.Channel.InstanceContextServiceThrottle == null)
584                 {
585                     this.requestInfo.Channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
586                 }
587             }
588
589             this.requestInfo.EndpointLookupDone = true;
590
591             if (this.requestInfo.Channel == null)
592             {
593                 // SFx drops a message here
594                 TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
595                 request.Close();
596                 return;
597             }
598
599             if (this.requestInfo.Channel.HasSession || this.isCallback)
600             {
601                 this.requestInfo.DispatchRuntime = this.requestInfo.Channel.DispatchRuntime;
602             }
603             else
604             {
605                 this.requestInfo.DispatchRuntime = this.requestInfo.Endpoint.DispatchRuntime;
606             }
607         }
608
609         void EnsurePump()
610         {
611             if (null == this.sharedTransactedBatchContext || this.isMainTransactedBatchHandler)
612             {
613                 if (TryAcquirePump())
614                 {
615                     if (this.receiveSynchronously)
616                     {
617                         ActionItem.Schedule(ChannelHandler.onStartSyncMessagePump, this);
618                     }
619                     else
620                     {
621                         if (Thread.CurrentThread.IsThreadPoolThread)
622                         {
623                             IAsyncResult result = this.BeginTryReceive();
624                             if ((result != null) && result.CompletedSynchronously)
625                             {
626                                 ActionItem.Schedule(ChannelHandler.onContinueAsyncReceive, result);
627                             }
628                         }
629                         else
630                         {
631                             // Since this is not a threadpool thread, we don't know if this thread will exit 
632                             // while the IO is still pending (which would cancel the IO), so we have to get 
633                             // over to a threadpool thread which we know will not exit while there is pending IO.
634                             ActionItem.Schedule(ChannelHandler.onStartAsyncMessagePump, this);
635                         }
636                     }
637                 }
638             }
639             else
640             {
641                 ActionItem.Schedule(ChannelHandler.onStartSingleTransactedBatch, this);
642             }
643         }
644
645         ServiceChannel GetDatagramChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
646         {
647             addressMatched = false;
648             endpoint = this.GetEndpointDispatcher(message, out addressMatched);
649
650             if (endpoint == null)
651             {
652                 return null;
653             }
654
655             if (endpoint.DatagramChannel == null)
656             {
657                 lock (this.listener.ThisLock)
658                 {
659                     if (endpoint.DatagramChannel == null)
660                     {
661                         endpoint.DatagramChannel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
662                         this.InitializeServiceChannel(endpoint.DatagramChannel);
663                     }
664                 }
665             }
666
667             return endpoint.DatagramChannel;
668         }
669
670         EndpointDispatcher GetEndpointDispatcher(Message message, out bool addressMatched)
671         {
672             return this.listener.Endpoints.Lookup(message, out addressMatched);
673         }
674
675         ServiceChannel GetSessionChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
676         {
677             addressMatched = false;
678
679             if (this.channel == null)
680             {
681                 lock (this.ThisLock)
682                 {
683                     if (this.channel == null)
684                     {
685                         endpoint = this.GetEndpointDispatcher(message, out addressMatched);
686                         if (endpoint != null)
687                         {
688                             this.channel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
689                             this.InitializeServiceChannel(this.channel);
690                         }
691                     }
692                 }
693             }
694
695             if (this.channel == null)
696             {
697                 endpoint = null;
698             }
699             else
700             {
701                 endpoint = this.channel.EndpointDispatcher;
702             }
703             return this.channel;
704         }
705
706         void InitializeServiceChannel(ServiceChannel channel)
707         {
708             if (this.wasChannelThrottled)
709             {
710                 // TFS#500703, when the idle timeout was hit, the constructor of ServiceChannel will abort itself directly. So
711                 // the session throttle will not be released and thus lead to a service unavailablity.
712                 // Note that if the channel is already aborted, the next line "channel.ServiceThrottle = this.throttle;" will throw an exception,
713                 // so we are not going to do any more work inside this method. 
714                 // Ideally we should do a thorough refactoring work for this throttling issue. However, it's too risky as a QFE. We should consider
715                 // this in a whole release.
716                 // Note that the "wasChannelThrottled" boolean will only be true if we aquired the session throttle. So we don't have to check HasSession
717                 // again here.
718                 if (channel.Aborted && this.throttle != null)
719                 {
720                     // This line will release the "session" throttle.
721                     this.throttle.DeactivateChannel();
722                 }
723
724                 channel.ServiceThrottle = this.throttle;
725             }
726
727             if (this.InstanceContextServiceThrottle != null)
728             {
729                 channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
730             }
731
732             ClientRuntime clientRuntime = channel.ClientRuntime;
733             if (clientRuntime != null)
734             {
735                 Type contractType = clientRuntime.ContractClientType;
736                 Type callbackType = clientRuntime.CallbackClientType;
737
738                 if (contractType != null)
739                 {
740                     channel.Proxy = ServiceChannelFactory.CreateProxy(contractType, callbackType, MessageDirection.Output, channel);
741                 }
742             }
743
744             if (this.listener != null)
745             {
746                 this.listener.ChannelDispatcher.InitializeChannel((IClientChannel)channel.Proxy);
747             }
748
749             ((IChannel)channel).Open();
750         }
751
752         void ProvideFault(Exception e, ref ErrorHandlerFaultInfo faultInfo)
753         {
754             if (this.listener != null)
755             {
756                 this.listener.ChannelDispatcher.ProvideFault(e, this.requestInfo.Channel == null ? this.binder.Channel.GetProperty<FaultConverter>() : this.requestInfo.Channel.GetProperty<FaultConverter>(), ref faultInfo);
757             }
758             else if (this.channel != null)
759             {
760                 DispatchRuntime dispatchBehavior = this.channel.ClientRuntime.CallbackDispatchRuntime;
761                 dispatchBehavior.ChannelDispatcher.ProvideFault(e, this.channel.GetProperty<FaultConverter>(), ref faultInfo);
762             }
763         }
764
765         internal bool HandleError(Exception e)
766         {
767             ErrorHandlerFaultInfo dummy = new ErrorHandlerFaultInfo();
768             return this.HandleError(e, ref dummy);
769         }
770
771         bool HandleError(Exception e, ref ErrorHandlerFaultInfo faultInfo)
772         {
773             if (e == null)
774             {
775                 Fx.Assert(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown)));
776                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown))));
777             }
778             if (this.listener != null)
779             {
780                 return listener.ChannelDispatcher.HandleError(e, ref faultInfo);
781             }
782             else if (this.channel != null)
783             {
784                 return this.channel.ClientRuntime.CallbackDispatchRuntime.ChannelDispatcher.HandleError(e, ref faultInfo);
785             }
786             else
787             {
788                 return false;
789             }
790         }
791
792         bool HandleError(Exception e, RequestContext request, ServiceChannel channel)
793         {
794             ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(this.messageVersion.Addressing.DefaultFaultAction);
795             bool replied, replySentAsync;
796             ProvideFaultAndReplyFailure(request, e, ref faultInfo, out replied, out replySentAsync);
797
798             if (!replySentAsync)
799             {
800                 return this.HandleErrorContinuation(e, request, channel, ref faultInfo, replied);
801             }
802             else
803             {
804                 return false;
805             }
806         }
807
808         bool HandleErrorContinuation(Exception e, RequestContext request, ServiceChannel channel, ref ErrorHandlerFaultInfo faultInfo, bool replied)
809         {
810             if (replied)
811             {
812                 try
813                 {
814                     request.Close();
815                 }
816                 catch (Exception e1)
817                 {
818                     if (Fx.IsFatal(e1))
819                     {
820                         throw;
821                     }
822                     this.HandleError(e1);
823                 }
824             }
825             else
826             {
827                 request.Abort();
828             }
829             if (!this.HandleError(e, ref faultInfo) && this.hasSession)
830             {
831                 if (channel != null)
832                 {
833                     if (replied)
834                     {
835                         TimeoutHelper timeoutHelper = new TimeoutHelper(CloseAfterFaultTimeout);
836                         try
837                         {
838                             channel.Close(timeoutHelper.RemainingTime());
839                         }
840                         catch (Exception e2)
841                         {
842                             if (Fx.IsFatal(e2))
843                             {
844                                 throw;
845                             }
846                             this.HandleError(e2);
847                         }
848                         try
849                         {
850                             this.binder.CloseAfterFault(timeoutHelper.RemainingTime());
851                         }
852                         catch (Exception e3)
853                         {
854                             if (Fx.IsFatal(e3))
855                             {
856                                 throw;
857                             }
858                             this.HandleError(e3);
859                         }
860                     }
861                     else
862                     {
863                         channel.Abort();
864                         this.binder.Abort();
865                     }
866                 }
867                 else
868                 {
869                     if (replied)
870                     {
871                         try
872                         {
873                             this.binder.CloseAfterFault(CloseAfterFaultTimeout);
874                         }
875                         catch (Exception e4)
876                         {
877                             if (Fx.IsFatal(e4))
878                             {
879                                 throw;
880                             }
881                             this.HandleError(e4);
882                         }
883                     }
884                     else
885                     {
886                         this.binder.Abort();
887                     }
888                 }
889             }
890
891             return true;
892         }
893
894         void HandleReceiveComplete(RequestContext context)
895         {
896             try
897             {
898                 if (this.channel != null)
899                 {
900                     this.channel.HandleReceiveComplete(context);
901                 }
902                 else
903                 {
904                     if (context == null && this.hasSession)
905                     {
906                         bool close;
907                         lock (this.ThisLock)
908                         {
909                             close = !this.doneReceiving;
910                             this.doneReceiving = true;
911                         }
912
913                         if (close)
914                         {
915                             this.receiver.Close();
916
917                             if (this.idleManager != null)
918                             {
919                                 this.idleManager.CancelTimer();
920                             }
921
922                             ServiceThrottle throttle = this.throttle;
923                             if (throttle != null)
924                             {
925                                 throttle.DeactivateChannel();
926                             }
927                         }
928                     }
929                 }
930             }
931             finally
932             {
933                 if ((context == null) && this.incrementedActivityCountInConstructor)
934                 {
935                     this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
936                 }
937             }
938         }
939
940         bool HandleRequest(RequestContext request, OperationContext currentOperationContext)
941         {
942             if (request == null)
943             {
944                 // channel EOF, stop receiving
945                 return false;
946             }
947
948             ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(request) : null;
949
950             using (ServiceModelActivity.BoundOperation(activity))
951             {
952                 if (this.HandleRequestAsReply(request))
953                 {
954                     this.ReleasePump();
955                     return true;
956                 }
957
958                 if (this.isChannelTerminated)
959                 {
960                     this.ReleasePump();
961                     this.ReplyChannelTerminated(request);
962                     return true;
963                 }
964
965                 if (this.requestInfo.RequestContext != null)
966                 {
967                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.RequestContext != null");
968                 }
969
970                 this.requestInfo.RequestContext = request;
971
972                 if (!this.TryAcquireCallThrottle(request))
973                 {
974                     // this.ThrottleAcquiredForCall will be called to continue
975                     return false;
976                 }
977
978                 // NOTE: from here on down, ensure that this code is the same as ThrottleAcquiredForCall (see 55460)
979                 if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
980                 {
981                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsCallThrottle");
982                 }
983                 this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
984
985                 if (!this.TryRetrievingInstanceContext(request))
986                 {
987                     //Would have replied and close the request.
988                     return true;
989                 }
990
991                 this.requestInfo.Channel.CompletedIOOperation();
992
993                 //Only acquire InstanceContext throttle if one doesnt already exist.
994                 if (!this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
995                 {
996                     // this.ThrottleAcquired will be called to continue
997                     return false;
998                 }
999                 if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1000                 {
1001                     Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1002                 }
1003                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1004
1005                 if (!this.DispatchAndReleasePump(request, true, currentOperationContext))
1006                 {
1007                     // this.DispatchDone will be called to continue
1008                     return false;
1009                 }
1010             }
1011             return true;
1012         }
1013
1014         bool HandleRequestAsReply(RequestContext request)
1015         {
1016             if (this.duplexBinder != null)
1017             {
1018                 if (this.duplexBinder.HandleRequestAsReply(request.RequestMessage))
1019                 {
1020                     return true;
1021                 }
1022             }
1023             return false;
1024         }
1025
1026         static void OnStartAsyncMessagePump(object state)
1027         {
1028             ((ChannelHandler)state).AsyncMessagePump();
1029         }
1030
1031         static void OnStartSyncMessagePump(object state)
1032         {
1033             ChannelHandler handler = state as ChannelHandler;
1034
1035             if (TD.ChannelReceiveStopIsEnabled())
1036             {
1037                 TD.ChannelReceiveStop(handler.EventTraceActivity, state.GetHashCode());
1038             }
1039
1040             if (handler.receiveWithTransaction)
1041             {
1042                 handler.SyncTransactionalMessagePump();
1043             }
1044             else
1045             {
1046                 handler.SyncMessagePump();
1047             }
1048         }
1049
1050         static void OnStartSingleTransactedBatch(object state)
1051         {
1052             ChannelHandler handler = state as ChannelHandler;
1053             handler.TransactedBatchLoop();
1054         }
1055
1056         static void OnAsyncReceiveComplete(IAsyncResult result)
1057         {
1058             if (!result.CompletedSynchronously)
1059             {
1060                 ((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
1061             }
1062         }
1063
1064         static void OnContinueAsyncReceive(object state)
1065         {
1066             IAsyncResult result = (IAsyncResult)state;
1067             ((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
1068         }
1069
1070         static void OpenAndEnsurePump(object state)
1071         {
1072             ((ChannelHandler)state).OpenAndEnsurePump();
1073         }
1074
1075         void OpenAndEnsurePump()
1076         {
1077             Exception exception = null;
1078             try
1079             {
1080                 this.binder.Channel.Open();
1081             }
1082             catch (Exception e)
1083             {
1084                 if (Fx.IsFatal(e))
1085                 {
1086                     throw;
1087                 }
1088                 exception = e;
1089             }
1090
1091             if (exception != null)
1092             {
1093                 if (DiagnosticUtility.ShouldTraceWarning)
1094                 {
1095                     TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning,
1096                         TraceCode.FailedToOpenIncomingChannel,
1097                         SR.GetString(SR.TraceCodeFailedToOpenIncomingChannel));
1098                 }
1099                 SessionIdleManager idleManager = this.idleManager;
1100                 if (idleManager != null)
1101                 {
1102                     idleManager.CancelTimer();
1103                 }
1104                 if ((this.throttle != null) && this.hasSession)
1105                 {
1106                     this.throttle.DeactivateChannel();
1107                 }
1108
1109                 bool errorHandled = this.HandleError(exception);
1110
1111                 if (this.incrementedActivityCountInConstructor)
1112                 {
1113                     this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
1114                 }
1115
1116                 if (!errorHandled)
1117                 {
1118                     this.binder.Channel.Abort();
1119                 }
1120             }
1121             else
1122             {
1123                 this.EnsurePump();
1124             }
1125         }
1126
1127         bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
1128         {
1129             this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
1130
1131             bool valid;
1132             if (this.needToCreateSessionOpenNotificationMessage)
1133             {
1134                 this.needToCreateSessionOpenNotificationMessage = false;
1135                 requestContext = this.GetSessionOpenNotificationRequestContext();
1136                 valid = true;
1137             }
1138             else
1139             {
1140                 valid = this.receiver.TryReceive(timeout, out requestContext);
1141             }
1142
1143             if (valid)
1144             {
1145                 this.HandleReceiveComplete(requestContext);
1146             }
1147
1148             return valid;
1149         }
1150
1151         void ReplyAddressFilterDidNotMatch(RequestContext request)
1152         {
1153             FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.DestinationUnreachable,
1154                 this.messageVersion.Addressing.Namespace);
1155             string reason = SR.GetString(SR.SFxNoEndpointMatchingAddress, request.RequestMessage.Headers.To);
1156
1157             ReplyFailure(request, code, reason);
1158         }
1159
1160         void ReplyContractFilterDidNotMatch(RequestContext request)
1161         {
1162             // By default, the contract filter is just a filter over the set of initiating actions in 
1163             // the contract, so we do error messages accordingly
1164             AddressingVersion addressingVersion = this.messageVersion.Addressing;
1165             if (addressingVersion != AddressingVersion.None && request.RequestMessage.Headers.Action == null)
1166             {
1167                 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
1168                     new MessageHeaderException(
1169                     SR.GetString(SR.SFxMissingActionHeader, addressingVersion.Namespace), AddressingStrings.Action, addressingVersion.Namespace));
1170             }
1171             else
1172             {
1173                 // some of this code is duplicated in DispatchRuntime.UnhandledActionInvoker
1174                 // ideally both places would use FaultConverter and ActionNotSupportedException
1175                 FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.ActionNotSupported,
1176                     this.messageVersion.Addressing.Namespace);
1177                 string reason = SR.GetString(SR.SFxNoEndpointMatchingContract, request.RequestMessage.Headers.Action);
1178                 ReplyFailure(request, code, reason, this.messageVersion.Addressing.FaultAction);
1179             }
1180         }
1181
1182         void ReplyChannelTerminated(RequestContext request)
1183         {
1184             FaultCode code = FaultCode.CreateSenderFaultCode(FaultCodeConstants.Codes.SessionTerminated,
1185                 FaultCodeConstants.Namespaces.NetDispatch);
1186             string reason = SR.GetString(SR.SFxChannelTerminated0);
1187             string action = FaultCodeConstants.Actions.NetDispatcher;
1188             Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
1189             ReplyFailure(request, fault, action, reason, code);
1190         }
1191
1192         void ReplyFailure(RequestContext request, FaultCode code, string reason)
1193         {
1194             string action = this.messageVersion.Addressing.DefaultFaultAction;
1195             ReplyFailure(request, code, reason, action);
1196         }
1197
1198         void ReplyFailure(RequestContext request, FaultCode code, string reason, string action)
1199         {
1200             Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
1201             ReplyFailure(request, fault, action, reason, code);
1202         }
1203
1204         void ReplyFailure(RequestContext request, Message fault, string action, string reason, FaultCode code)
1205         {
1206             FaultException exception = new FaultException(reason, code);
1207             ErrorBehavior.ThrowAndCatch(exception);
1208             ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(action);
1209             faultInfo.Fault = fault;
1210             bool replied, replySentAsync;
1211             ProvideFaultAndReplyFailure(request, exception, ref faultInfo, out replied, out replySentAsync);
1212             this.HandleError(exception, ref faultInfo);
1213         }
1214
1215         void ProvideFaultAndReplyFailure(RequestContext request, Exception exception, ref ErrorHandlerFaultInfo faultInfo, out bool replied, out bool replySentAsync)
1216         {
1217             replied = false;
1218             replySentAsync = false;
1219             bool requestMessageIsFault = false;
1220             try
1221             {
1222                 requestMessageIsFault = request.RequestMessage.IsFault;
1223             }
1224 #pragma warning suppress 56500 // covered by FxCOP
1225             catch (Exception e)
1226             {
1227                 if (Fx.IsFatal(e))
1228                 {
1229                     throw;
1230                 }
1231                 // ---- it
1232             }
1233
1234             bool enableFaults = false;
1235             if (this.listener != null)
1236             {
1237                 enableFaults = this.listener.ChannelDispatcher.EnableFaults;
1238             }
1239             else if (this.channel != null && this.channel.IsClient)
1240             {
1241                 enableFaults = this.channel.ClientRuntime.EnableFaults;
1242             }
1243
1244             if ((!requestMessageIsFault) && enableFaults)
1245             {
1246                 this.ProvideFault(exception, ref faultInfo);
1247                 if (faultInfo.Fault != null)
1248                 {
1249                     Message reply = faultInfo.Fault;
1250                     try
1251                     {
1252                         try
1253                         {
1254                             if (this.PrepareReply(request, reply))
1255                             {
1256                                 if (this.sendAsynchronously)
1257                                 {
1258                                     var state = new ContinuationState { ChannelHandler = this, Channel = channel, Exception = exception, FaultInfo = faultInfo, Request = request, Reply = reply };
1259                                     var result = request.BeginReply(reply, ChannelHandler.onAsyncReplyComplete, state);
1260                                     if (result.CompletedSynchronously)
1261                                     {
1262                                         ChannelHandler.AsyncReplyComplete(result, state);
1263                                         replied = true;
1264                                     }
1265                                     else
1266                                     {
1267                                         replySentAsync = true;
1268                                     }
1269                                 }
1270                                 else
1271                                 {
1272                                     request.Reply(reply);
1273                                     replied = true;
1274                                 }
1275                             }
1276                         }
1277                         finally
1278                         {
1279                             if (!replySentAsync)
1280                             {
1281                                 reply.Close();
1282                             }
1283                         }
1284                     }
1285 #pragma warning suppress 56500 // covered by FxCOP
1286                     catch (Exception e)
1287                     {
1288                         if (Fx.IsFatal(e))
1289                         {
1290                             throw;
1291                         }
1292                         this.HandleError(e);
1293                     }
1294                 }
1295             }
1296         }
1297
1298         /// <summary>
1299         /// Prepares a reply that can either be sent asynchronously or synchronously depending on the value of 
1300         /// sendAsynchronously
1301         /// </summary>
1302         /// <param name="request">The request context to prepare</param>
1303         /// <param name="reply">The reply to prepare</param>
1304         /// <returns>True if channel is open and prepared reply should be sent; otherwise false.</returns>
1305         bool PrepareReply(RequestContext request, Message reply)
1306         {
1307             // Ensure we only reply once (we may hit the same error multiple times)
1308             if (this.replied == request)
1309             {
1310                 return false;
1311             }
1312             this.replied = request;
1313
1314             bool canSendReply = true;
1315
1316             Message requestMessage = null;
1317             try
1318             {
1319                 requestMessage = request.RequestMessage;
1320             }
1321 #pragma warning suppress 56500 // covered by FxCOP
1322             catch (Exception e)
1323             {
1324                 if (Fx.IsFatal(e))
1325                 {
1326                     throw;
1327                 }
1328                 // ---- it
1329             }
1330             if (!object.ReferenceEquals(requestMessage, null))
1331             {
1332                 UniqueId requestID = null;
1333                 try
1334                 {
1335                     requestID = requestMessage.Headers.MessageId;
1336                 }
1337                 catch (MessageHeaderException)
1338                 {
1339                     // ---- it - we don't need to correlate the reply if the MessageId header is bad
1340                 }
1341                 if (!object.ReferenceEquals(requestID, null) && !this.isManualAddressing)
1342                 {
1343                     System.ServiceModel.Channels.RequestReplyCorrelator.PrepareReply(reply, requestID);
1344                 }
1345                 if (!this.hasSession && !this.isManualAddressing)
1346                 {
1347                     try
1348                     {
1349                         canSendReply = System.ServiceModel.Channels.RequestReplyCorrelator.AddressReply(reply, requestMessage);
1350                     }
1351                     catch (MessageHeaderException)
1352                     {
1353                         // ---- it - we don't need to address the reply if the FaultTo header is bad
1354                     }
1355                 }
1356             }
1357
1358             // ObjectDisposeException can happen
1359             // if the channel is closed in a different
1360             // thread. 99% this check will avoid false
1361             // exceptions.
1362             return this.IsOpen && canSendReply;
1363         }
1364
1365         static void AsyncReplyComplete(IAsyncResult result, ContinuationState state)
1366         {
1367             try
1368             {
1369                 state.Request.EndReply(result);
1370             }
1371             catch (Exception e)
1372             {
1373                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1374
1375                 if (Fx.IsFatal(e))
1376                 {
1377                     throw;
1378                 }
1379                 
1380                 state.ChannelHandler.HandleError(e);
1381             }
1382
1383             try
1384             {
1385                 state.Reply.Close();
1386             }
1387             catch (Exception e)
1388             {
1389                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1390
1391                 if (Fx.IsFatal(e))
1392                 {
1393                     throw;
1394                 }
1395
1396                 state.ChannelHandler.HandleError(e);
1397             }
1398
1399             try
1400             {
1401                 state.ChannelHandler.HandleErrorContinuation(state.Exception, state.Request, state.Channel, ref state.FaultInfo, true);
1402             }
1403             catch (Exception e)
1404             {
1405                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1406
1407                 if (Fx.IsFatal(e))
1408                 {
1409                     throw;
1410                 }
1411
1412                 state.ChannelHandler.HandleError(e);
1413             }
1414
1415             state.ChannelHandler.EnsurePump();
1416         }
1417
1418         static void OnAsyncReplyComplete(IAsyncResult result)
1419         {
1420             if (result.CompletedSynchronously)
1421             {
1422                 return;
1423             }
1424
1425             try
1426             {
1427                 var state = (ContinuationState)result.AsyncState;
1428                 ChannelHandler.AsyncReplyComplete(result, state);
1429             }
1430             catch (Exception e)
1431             {
1432                 DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
1433
1434                 if (Fx.IsFatal(e))
1435                 {
1436                     throw;
1437                 }
1438             }
1439         }
1440
1441         void ReleasePump()
1442         {
1443             if (this.isConcurrent)
1444             {
1445                 Interlocked.Exchange(ref this.isPumpAcquired, 0);
1446             }
1447         }
1448
1449         void SyncMessagePump()
1450         {
1451             OperationContext existingOperationContext = OperationContext.Current;
1452             try
1453             {
1454                 OperationContext currentOperationContext = new OperationContext(this.host);
1455                 OperationContext.Current = currentOperationContext;
1456
1457                 for (;;)
1458                 {
1459                     RequestContext request;
1460
1461                     this.requestInfo.Cleanup();
1462
1463                     while (!TryReceive(TimeSpan.MaxValue, out request))
1464                     {
1465                     }
1466
1467                     if (!HandleRequest(request, currentOperationContext))
1468                     {
1469                         break;
1470                     }
1471
1472                     if (!TryAcquirePump())
1473                     {
1474                         break;
1475                     }
1476
1477                     currentOperationContext.Recycle();
1478                 }
1479             }
1480             finally
1481             {
1482                 OperationContext.Current = existingOperationContext;
1483             }
1484         }
1485
1486         [MethodImpl(MethodImplOptions.NoInlining)]
1487         void SyncTransactionalMessagePump()
1488         {
1489             for (;;)
1490             {
1491                 bool completedSynchronously;
1492                 if (null == sharedTransactedBatchContext)
1493                 {
1494                     completedSynchronously = TransactedLoop();
1495                 }
1496                 else
1497                 {
1498                     completedSynchronously = TransactedBatchLoop();
1499                 }
1500
1501                 if (!completedSynchronously)
1502                 {
1503                     return;
1504                 }
1505             }
1506         }
1507
1508         bool TransactedLoop()
1509         {
1510             try
1511             {
1512                 this.receiver.WaitForMessage();
1513             }
1514             catch (Exception ex)
1515             {
1516                 if (Fx.IsFatal(ex))
1517                 {
1518                     throw;
1519                 }
1520
1521                 if (!this.HandleError(ex))
1522                 {
1523                     throw;
1524                 }
1525             }
1526
1527             RequestContext request;
1528             Transaction tx = CreateOrGetAttachedTransaction();
1529             OperationContext existingOperationContext = OperationContext.Current;
1530
1531             try
1532             {
1533                 OperationContext currentOperationContext = new OperationContext(this.host);
1534                 OperationContext.Current = currentOperationContext;
1535
1536                 for (;;)
1537                 {
1538                     this.requestInfo.Cleanup();
1539
1540                     bool received = TryTransactionalReceive(tx, out request);
1541
1542                     if (!received)
1543                     {
1544                         return IsOpen;
1545                     }
1546
1547                     if (null == request)
1548                     {
1549                         return false;
1550                     }
1551
1552                     TransactionMessageProperty.Set(tx, request.RequestMessage);
1553
1554                     if (!HandleRequest(request, currentOperationContext))
1555                     {
1556                         return false;
1557                     }
1558
1559                     if (!TryAcquirePump())
1560                     {
1561                         return false;
1562                     }
1563
1564                     tx = CreateOrGetAttachedTransaction();
1565                     currentOperationContext.Recycle();
1566                 }
1567             }
1568             finally
1569             {
1570                 OperationContext.Current = existingOperationContext;
1571             }
1572         }
1573
1574         bool TransactedBatchLoop()
1575         {
1576             if (null != this.transactedBatchContext)
1577             {
1578                 if (this.transactedBatchContext.InDispatch)
1579                 {
1580                     this.transactedBatchContext.ForceRollback();
1581                     this.transactedBatchContext.InDispatch = false;
1582                 }
1583                 if (!this.transactedBatchContext.IsActive)
1584                 {
1585                     if (!this.isMainTransactedBatchHandler)
1586                     {
1587                         return false;
1588                     }
1589                     this.transactedBatchContext = null;
1590                 }
1591             }
1592
1593             if (null == this.transactedBatchContext)
1594             {
1595                 try
1596                 {
1597                     this.receiver.WaitForMessage();
1598                 }
1599                 catch (Exception ex)
1600                 {
1601                     if (Fx.IsFatal(ex))
1602                     {
1603                         throw;
1604                     }
1605
1606                     if (!this.HandleError(ex))
1607                     {
1608                         throw;
1609                     }
1610                 }
1611                 this.transactedBatchContext = this.sharedTransactedBatchContext.CreateTransactedBatchContext();
1612             }
1613
1614             OperationContext existingOperationContext = OperationContext.Current;
1615
1616             try
1617             {
1618                 OperationContext currentOperationContext = new OperationContext(this.host);
1619                 OperationContext.Current = currentOperationContext;
1620
1621                 RequestContext request;
1622
1623                 while (this.transactedBatchContext.IsActive)
1624                 {
1625                     this.requestInfo.Cleanup();
1626
1627                     bool valid = TryTransactionalReceive(this.transactedBatchContext.Transaction, out request);
1628
1629                     if (!valid)
1630                     {
1631                         if (this.IsOpen)
1632                         {
1633                             this.transactedBatchContext.ForceCommit();
1634                             return true;
1635                         }
1636                         else
1637                         {
1638                             this.transactedBatchContext.ForceRollback();
1639                             return false;
1640                         }
1641                     }
1642
1643                     if (null == request)
1644                     {
1645                         this.transactedBatchContext.ForceRollback();
1646                         return false;
1647                     }
1648
1649                     TransactionMessageProperty.Set(this.transactedBatchContext.Transaction, request.RequestMessage);
1650
1651                     this.transactedBatchContext.InDispatch = true;
1652                     if (!HandleRequest(request, currentOperationContext))
1653                     {
1654                         return false;
1655                     }
1656
1657                     if (this.transactedBatchContext.InDispatch)
1658                     {
1659                         this.transactedBatchContext.ForceRollback();
1660                         this.transactedBatchContext.InDispatch = false;
1661                         return true;
1662                     }
1663
1664                     if (!TryAcquirePump())
1665                     {
1666                         Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchLoop(): (TryAcquiredPump returned false)");
1667                         return false;
1668                     }
1669
1670                     currentOperationContext.Recycle();
1671                 }
1672             }
1673             finally
1674             {
1675                 OperationContext.Current = existingOperationContext;
1676             }
1677             return true;
1678         }
1679
1680         Transaction CreateOrGetAttachedTransaction()
1681         {
1682             if (null != this.acceptTransaction)
1683             {
1684                 lock (ThisLock)
1685                 {
1686                     if (null != this.acceptTransaction)
1687                     {
1688                         Transaction tx = this.acceptTransaction.Transaction;
1689                         this.acceptTransaction = null;
1690                         return tx;
1691                     }
1692                 }
1693             }
1694
1695             if (null != this.InstanceContext && this.InstanceContext.HasTransaction)
1696             {
1697                 return InstanceContext.Transaction.Attached;
1698             }
1699             else
1700             {
1701                 return TransactionBehavior.CreateTransaction(
1702                     this.listener.ChannelDispatcher.TransactionIsolationLevel,
1703                     TransactionBehavior.NormalizeTimeout(this.listener.ChannelDispatcher.TransactionTimeout));
1704             }
1705         }
1706
1707         // calls receive on the channel; returns false if no message during the "short timeout"
1708         bool TryTransactionalReceive(Transaction tx, out RequestContext request)
1709         {
1710             request = null;
1711             bool received = false;
1712
1713             try
1714             {
1715                 using (TransactionScope scope = new TransactionScope(tx))
1716                 {
1717                     if (null != this.sharedTransactedBatchContext)
1718                     {
1719                         lock (this.sharedTransactedBatchContext.ReceiveLock)
1720                         {
1721                             if (this.transactedBatchContext.AboutToExpire)
1722                             {
1723                                 return false;
1724                             }
1725
1726                             received = this.receiver.TryReceive(TimeSpan.Zero, out request);
1727                         }
1728                     }
1729                     else
1730                     {
1731                         TimeSpan receiveTimeout = TimeoutHelper.Min(this.listener.ChannelDispatcher.TransactionTimeout, this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
1732                         received = this.receiver.TryReceive(TransactionBehavior.NormalizeTimeout(receiveTimeout), out request);
1733                     }
1734                     scope.Complete();
1735                 }
1736
1737                 if (received)
1738                 {
1739                     this.HandleReceiveComplete(request);
1740                 }
1741             }
1742             catch (ObjectDisposedException ex) // thrown from the transaction
1743             {
1744                 this.HandleError(ex);
1745                 request = null;
1746                 return false;
1747             }
1748             catch (TransactionException ex)
1749             {
1750                 this.HandleError(ex);
1751                 request = null;
1752                 return false;
1753             }
1754             catch (Exception ex)
1755             {
1756                 if (Fx.IsFatal(ex))
1757                 {
1758                     throw;
1759                 }
1760
1761                 if (!this.HandleError(ex))
1762                 {
1763                     throw;
1764                 }
1765             }
1766
1767             return received;
1768         }
1769
1770         // This callback always occurs async and always on a dirty thread
1771         internal void ThrottleAcquiredForCall()
1772         {
1773             RequestContext request = this.requestWaitingForThrottle;
1774             this.requestWaitingForThrottle = null;
1775             if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
1776             {
1777                 Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsCallThrottle");
1778             }
1779             this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
1780
1781             if (!this.TryRetrievingInstanceContext(request))
1782             {
1783                 //Should reply/close request and also close the pump                
1784                 this.EnsurePump();
1785                 return;
1786             }
1787
1788             this.requestInfo.Channel.CompletedIOOperation();
1789
1790             if (this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
1791             {
1792                 if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1793                 {
1794                     Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1795                 }
1796                 this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1797
1798                 if (this.DispatchAndReleasePump(request, false, null))
1799                 {
1800                     this.EnsurePump();
1801                 }
1802             }
1803         }
1804
1805         bool TryRetrievingInstanceContext(RequestContext request)
1806         {
1807             try
1808             {
1809                 return TryRetrievingInstanceContextCore(request);
1810             }
1811             catch (Exception ex)
1812             {
1813                 if (Fx.IsFatal(ex))
1814                 {
1815                     throw;
1816                 }
1817
1818                 DiagnosticUtility.TraceHandledException(ex, TraceEventType.Error);
1819
1820                 try
1821                 {
1822                     request.Close();
1823                 }
1824                 catch (Exception e)
1825                 {
1826                     if (Fx.IsFatal(e))
1827                     {
1828                         throw;
1829                     }
1830
1831                     request.Abort();
1832                 }
1833
1834                 return false;
1835             }
1836         }
1837
1838         //Return: False denotes failure, Caller should discard the request.
1839         //      : True denotes operation is sucessful.
1840         bool TryRetrievingInstanceContextCore(RequestContext request)
1841         {
1842             bool releasePump = true;
1843             try
1844             {
1845                 if (!this.requestInfo.EndpointLookupDone)
1846                 {
1847                     this.EnsureChannelAndEndpoint(request);
1848                 }
1849
1850                 if (this.requestInfo.Channel == null)
1851                 {
1852                     return false;
1853                 }
1854
1855                 if (this.requestInfo.DispatchRuntime != null)
1856                 {
1857                     IContextChannel transparentProxy = this.requestInfo.Channel.Proxy as IContextChannel;
1858                     try
1859                     {
1860                         this.requestInfo.ExistingInstanceContext = this.requestInfo.DispatchRuntime.InstanceContextProvider.GetExistingInstanceContext(request.RequestMessage, transparentProxy);
1861                         releasePump = false;
1862                     }
1863                     catch (Exception e)
1864                     {
1865                         if (Fx.IsFatal(e))
1866                         {
1867                             throw;
1868                         }
1869                         this.requestInfo.Channel = null;
1870                         this.HandleError(e, request, channel);
1871                         return false;
1872                     }
1873                 }
1874                 else
1875                 {
1876                     // This can happen if we are pumping for an async client,
1877                     // and we receive a bogus reply.  In that case, there is no
1878                     // DispatchRuntime, because we are only expecting replies.
1879                     //
1880                     // One possible fix for this would be in DuplexChannelBinder
1881                     // to drop all messages with a RelatesTo that do not match a
1882                     // pending request.
1883                     //
1884                     // However, that would not fix:
1885                     // (a) we could get a valid request message with a
1886                     // RelatesTo that we should try to process.
1887                     // (b) we could get a reply message that does not have
1888                     // a RelatesTo.
1889                     //
1890                     // So we do the null check here.
1891                     //
1892                     // SFx drops a message here
1893                     TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
1894                     request.Close();
1895                     return false;
1896                 }
1897             }
1898             catch (Exception e)
1899             {
1900                 if (Fx.IsFatal(e))
1901                 {
1902                     throw;
1903                 }
1904
1905                 this.HandleError(e, request, channel);
1906                 
1907                 return false;
1908             }
1909             finally
1910             {
1911                 if (releasePump)
1912                 {
1913                     this.ReleasePump();
1914                 }
1915             }
1916             return true;
1917         }
1918
1919         // This callback always occurs async and always on a dirty thread
1920         internal void ThrottleAcquired()
1921         {
1922             RequestContext request = this.requestWaitingForThrottle;
1923             this.requestWaitingForThrottle = null;
1924             if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
1925             {
1926                 Fx.Assert("ChannelHandler.ThrottleAcquired: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
1927             }
1928             this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
1929
1930             if (this.DispatchAndReleasePump(request, false, null))
1931             {
1932                 this.EnsurePump();
1933             }
1934         }
1935
1936         bool TryAcquireThrottle(RequestContext request, bool acquireInstanceContextThrottle)
1937         {
1938             ServiceThrottle throttle = this.throttle;
1939             if ((throttle != null) && (throttle.IsActive))
1940             {
1941                 this.requestWaitingForThrottle = request;
1942
1943                 if (throttle.AcquireInstanceContextAndDynamic(this, acquireInstanceContextThrottle))
1944                 {
1945                     this.requestWaitingForThrottle = null;
1946                     return true;
1947                 }
1948                 else
1949                 {
1950                     return false;
1951                 }
1952             }
1953             else
1954             {
1955                 return true;
1956             }
1957         }
1958
1959         bool TryAcquireCallThrottle(RequestContext request)
1960         {
1961             ServiceThrottle throttle = this.throttle;
1962             if ((throttle != null) && (throttle.IsActive))
1963             {
1964                 this.requestWaitingForThrottle = request;
1965
1966                 if (throttle.AcquireCall(this))
1967                 {
1968                     this.requestWaitingForThrottle = null;
1969                     return true;
1970                 }
1971                 else
1972                 {
1973                     return false;
1974                 }
1975             }
1976             else
1977             {
1978                 return true;
1979             }
1980         }
1981
1982         bool TryAcquirePump()
1983         {
1984             if (this.isConcurrent)
1985             {
1986                 return Interlocked.CompareExchange(ref this.isPumpAcquired, 1, 0) == 0;
1987             }
1988
1989             return true;
1990         }
1991
1992         struct RequestInfo
1993         {
1994             public EndpointDispatcher Endpoint;
1995             public InstanceContext ExistingInstanceContext;
1996             public ServiceChannel Channel;
1997             public bool EndpointLookupDone;
1998             public DispatchRuntime DispatchRuntime;
1999             public RequestContext RequestContext;
2000             public ChannelHandler ChannelHandler;
2001             public bool ChannelHandlerOwnsCallThrottle; // if true, we are responsible for call throttle
2002             public bool ChannelHandlerOwnsInstanceContextThrottle; // if true, we are responsible for instance/dynamic throttle
2003
2004             public RequestInfo(ChannelHandler channelHandler)
2005             {
2006                 this.Endpoint = null;
2007                 this.ExistingInstanceContext = null;
2008                 this.Channel = null;
2009                 this.EndpointLookupDone = false;
2010                 this.DispatchRuntime = null;
2011                 this.RequestContext = null;
2012                 this.ChannelHandler = channelHandler;
2013                 this.ChannelHandlerOwnsCallThrottle = false;
2014                 this.ChannelHandlerOwnsInstanceContextThrottle = false;
2015             }
2016
2017             public void Cleanup()
2018             {
2019                 if (this.ChannelHandlerOwnsInstanceContextThrottle)
2020                 {
2021                     this.ChannelHandler.throttle.DeactivateInstanceContext();
2022                     this.ChannelHandlerOwnsInstanceContextThrottle = false;
2023                 }
2024
2025                 this.Endpoint = null;
2026                 this.ExistingInstanceContext = null;
2027                 this.Channel = null;
2028                 this.EndpointLookupDone = false;
2029                 this.RequestContext = null;
2030                 if (this.ChannelHandlerOwnsCallThrottle)
2031                 {
2032                     this.ChannelHandler.DispatchDone();
2033                     this.ChannelHandlerOwnsCallThrottle = false;
2034                 }
2035             }
2036         }
2037
2038         EventTraceActivity TraceDispatchMessageStart(Message message)
2039         {
2040             if (FxTrace.Trace.IsEnd2EndActivityTracingEnabled && message != null)
2041             {
2042                 EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
2043                 if (TD.DispatchMessageStartIsEnabled())
2044                 {
2045                     TD.DispatchMessageStart(eventTraceActivity);
2046                 }
2047                 return eventTraceActivity;
2048             }
2049
2050             return null;
2051         }
2052
2053         /// <summary>
2054         /// Data structure used to carry state for asynchronous replies
2055         /// </summary>
2056         struct ContinuationState
2057         {
2058             public ChannelHandler ChannelHandler;
2059             public Exception Exception;
2060             public RequestContext Request;
2061             public Message Reply;
2062             public ServiceChannel Channel;
2063             public ErrorHandlerFaultInfo FaultInfo;
2064         }
2065     }
2066 }