2 // ChannelDispatcher.cs
5 // Atsushi Enomoto <atsushi@ximian.com>
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
32 using System.Reflection;
33 using System.ServiceModel.Channels;
34 using System.Threading;
35 using System.Transactions;
36 using System.ServiceModel;
37 using System.ServiceModel.Description;
39 namespace System.ServiceModel.Dispatcher
41 public class ChannelDispatcher : ChannelDispatcherBase
43 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
45 public EndpointDispatcherCollection (ChannelDispatcher owner)
50 ChannelDispatcher owner;
52 protected override void ClearItems ()
54 foreach (var ed in this)
55 ed.ChannelDispatcher = null;
59 protected override void InsertItem (int index, EndpointDispatcher item)
61 item.ChannelDispatcher = owner;
62 base.InsertItem (index, item);
65 protected override void RemoveItem (int index)
68 this [index].ChannelDispatcher = null;
69 base.RemoveItem (index);
72 protected override void SetItem (int index, EndpointDispatcher item)
74 item.ChannelDispatcher = owner;
75 base.SetItem (index, item);
82 Collection<IErrorHandler> error_handlers
83 = new Collection<IErrorHandler> ();
84 IChannelListener listener;
85 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
86 MessageVersion message_version;
87 bool receive_sync, include_exception_detail_in_faults,
88 manual_addressing, is_tx_receive;
89 int max_tx_batch_size;
90 SynchronizedCollection<IChannelInitializer> initializers
91 = new SynchronizedCollection<IChannelInitializer> ();
92 IsolationLevel tx_isolation_level;
94 ServiceThrottle throttle;
96 Guid identifier = Guid.NewGuid ();
97 ManualResetEvent async_event = new ManualResetEvent (false);
99 ListenerLoopManager loop_manager;
100 SynchronizedCollection<EndpointDispatcher> endpoints;
102 [MonoTODO ("get binding info from config")]
103 public ChannelDispatcher (IChannelListener listener)
104 : this (listener, null)
108 public ChannelDispatcher (
109 IChannelListener listener, string bindingName)
110 : this (listener, bindingName, null)
114 public ChannelDispatcher (
115 IChannelListener listener, string bindingName,
116 IDefaultCommunicationTimeouts timeouts)
118 if (listener == null)
119 throw new ArgumentNullException ("listener");
120 Init (listener, bindingName, timeouts);
123 private void Init (IChannelListener listener, string bindingName,
124 IDefaultCommunicationTimeouts timeouts)
126 this.listener = listener;
127 this.binding_name = bindingName;
128 // IChannelListener is often a ChannelListenerBase
129 // which implements IDefaultCommunicationTimeouts.
130 this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
131 endpoints = new EndpointDispatcherCollection (this);
134 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
136 //Attach one EndpointDispacher to the ChannelDispatcher
137 EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
138 this.Endpoints.Add (ed);
139 ed.InitializeServiceEndpoint (false, serviceType, se);
143 public string BindingName {
144 get { return binding_name; }
147 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
148 get { return initializers; }
151 protected internal override TimeSpan DefaultCloseTimeout {
152 get { return timeouts.CloseTimeout; }
155 protected internal override TimeSpan DefaultOpenTimeout {
156 get { return timeouts.OpenTimeout; }
159 public Collection<IErrorHandler> ErrorHandlers {
160 get { return error_handlers; }
163 public SynchronizedCollection<EndpointDispatcher> Endpoints {
164 get { return endpoints; }
168 public bool IsTransactedAccept {
169 get { throw new NotImplementedException (); }
172 public bool IsTransactedReceive {
173 get { return is_tx_receive; }
174 set { is_tx_receive = value; }
177 public bool ManualAddressing {
178 get { return manual_addressing; }
179 set { manual_addressing = value; }
182 public int MaxTransactedBatchSize {
183 get { return max_tx_batch_size; }
184 set { max_tx_batch_size = value; }
187 public override ServiceHostBase Host {
191 public override IChannelListener Listener {
192 get { return listener; }
195 public MessageVersion MessageVersion {
196 get { return message_version; }
197 set { message_version = value; }
200 public bool ReceiveSynchronously {
201 get { return receive_sync; }
203 ThrowIfDisposedOrImmutable ();
204 receive_sync = value;
208 public bool IncludeExceptionDetailInFaults {
209 get { return include_exception_detail_in_faults; }
210 set { include_exception_detail_in_faults = value; }
213 public ServiceThrottle ServiceThrottle {
214 get { return throttle; }
215 set { throttle = value; }
218 public IsolationLevel TransactionIsolationLevel {
219 get { return tx_isolation_level; }
220 set { tx_isolation_level = value; }
223 public TimeSpan TransactionTimeout {
224 get { return tx_timeout; }
225 set { tx_timeout = value; }
228 protected internal override void Attach (ServiceHostBase host)
231 var bl = listener as IChannelDispatcherBoundListener;
233 bl.ChannelDispatcher = this;
236 public override void CloseInput ()
238 if (loop_manager != null)
239 loop_manager.CloseInput ();
242 protected internal override void Detach (ServiceHostBase host)
247 protected override void OnAbort ()
249 if (loop_manager != null)
250 loop_manager.Stop (TimeSpan.FromTicks (1));
253 Action<TimeSpan> open_delegate;
254 Action<TimeSpan> close_delegate;
256 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
257 AsyncCallback callback, object state)
259 if (close_delegate == null)
260 close_delegate = new Action<TimeSpan> (OnClose);
261 return close_delegate.BeginInvoke (timeout, callback, state);
264 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
265 AsyncCallback callback, object state)
267 if (open_delegate == null)
268 open_delegate = new Action<TimeSpan> (OnOpen);
269 return open_delegate.BeginInvoke (timeout, callback, state);
272 protected override void OnClose (TimeSpan timeout)
274 if (loop_manager != null)
275 loop_manager.Stop (timeout);
278 protected override void OnClosed ()
281 host.ChannelDispatchers.Remove (this);
285 protected override void OnEndClose (IAsyncResult result)
287 close_delegate.EndInvoke (result);
290 protected override void OnEndOpen (IAsyncResult result)
292 open_delegate.EndInvoke (result);
295 protected override void OnOpen (TimeSpan timeout)
297 if (Host == null || MessageVersion == null)
298 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
300 loop_manager.Setup (timeout);
303 protected override void OnOpening ()
306 loop_manager = new ListenerLoopManager (this);
309 protected override void OnOpened ()
317 // FIXME: not sure if it should be filled here.
318 if (ServiceThrottle == null)
319 ServiceThrottle = new ServiceThrottle (this);
321 loop_manager.Start ();
325 // isolated from ChannelDispatcher
326 class ListenerLoopManager
328 ChannelDispatcher owner;
329 AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
330 AutoResetEvent creator_handle = new AutoResetEvent (false);
331 ManualResetEvent stop_handle = new ManualResetEvent (false);
334 DateTime close_started;
335 TimeSpan close_timeout;
336 Func<IAsyncResult> channel_acceptor;
337 List<IChannel> channels = new List<IChannel> ();
338 AddressFilterMode address_filter_mode;
339 List<ISession> sessions = new List<ISession> ();
341 public ListenerLoopManager (ChannelDispatcher owner)
344 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
346 address_filter_mode = sba.AddressFilterMode;
349 public void Setup (TimeSpan openTimeout)
351 if (owner.Listener.State != CommunicationState.Created)
352 throw new InvalidOperationException ("Tried to open the channel listener which is bound to ChannelDispatcher, but it is not at Created state");
353 owner.Listener.Open (openTimeout);
355 // It is tested at Open(), but strangely it is not instantiated at this point.
356 foreach (var ed in owner.Endpoints)
357 if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
358 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
359 SetupChannelAcceptor ();
364 if (loop_thread == null)
365 loop_thread = new Thread (new ThreadStart (Loop));
366 loop_thread.Start ();
369 Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
371 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
374 AsyncCallback callback = delegate (IAsyncResult result) {
376 ChannelAccepted (r.EndAcceptChannel (result));
377 } catch (Exception ex) {
378 Logger.Error ("Exception during finishing channel acceptance.", ex);
379 creator_handle.Set ();
384 return r.BeginAcceptChannel (callback, null);
385 } catch (Exception ex) {
386 Logger.Error ("Exception during accepting channel.", ex);
392 void SetupChannelAcceptor ()
394 var l = owner.Listener;
396 CreateAcceptor<IReplyChannel> (l) ??
397 CreateAcceptor<IReplySessionChannel> (l) ??
398 CreateAcceptor<IInputChannel> (l) ??
399 CreateAcceptor<IInputSessionChannel> (l) ??
400 CreateAcceptor<IDuplexChannel> (l) ??
401 CreateAcceptor<IDuplexSessionChannel> (l);
402 if (channel_acceptor == null)
403 throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
406 public void Stop (TimeSpan timeout)
408 if (loop_thread == null)
411 close_started = DateTime.Now;
412 close_timeout = timeout;
414 creator_handle.Set ();
415 throttle_wait_handle.Set (); // break primary loop
416 if (stop_handle != null) {
417 stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
418 stop_handle.Close ();
421 if (owner.Listener.State != CommunicationState.Closed) {
422 Logger.Warning (String.Format ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ()));
423 owner.Listener.Abort ();
425 if (loop_thread != null && loop_thread.IsAlive)
426 loop_thread.Abort ();
430 void AddChannel (IChannel ch)
433 var ich = ch as ISessionChannel<IInputSession>;
434 if (ich != null && ich.Session != null) {
436 var session = sessions.FirstOrDefault (s => s.Id == ich.Session.Id);
438 sessions.Add (session);
443 void RemoveChannel (IChannel ch)
445 channels.Remove (ch); // zonbie, if exists
446 var ich = ch as ISessionChannel<IInputSession>;
448 if (ich != null && ich.Session != null)
449 sessions.Remove (ich.Session);
452 public void CloseInput ()
454 foreach (var ch in channels.ToArray ()) {
455 if (ch.State == CommunicationState.Closed) {
462 ch.Close (close_timeout - (DateTime.Now - close_started));
463 } catch (Exception ex) {
465 Logger.Error (String.Format ("Exception on closing channel ({0})", ch.GetType ()), ex);
476 } catch (Exception ex) {
477 Logger.Error (String.Format ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener), ex);
479 if (stop_handle != null)
488 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
489 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
492 // FIXME: take MaxConcurrentCalls into consideration appropriately.
493 while (loop && channels.Count < Math.Min (owner.ServiceThrottle.MaxConcurrentSessions, owner.ServiceThrottle.MaxConcurrentCalls)) {
494 // FIXME: this should not be required, but saves multi-ChannelDispatcher case (Throttling enabled) for HTTP standalone listener...
497 creator_handle.WaitOne (); // released by ChannelAccepted()
501 throttle_wait_handle.WaitOne (); // released by IChannel.Close()
504 owner.Listener.Close ();
505 } catch (Exception ex) {
506 Logger.Error (String.Format ("Exception while closing IChannelListener ({0})", owner.Listener.GetType ()), ex);
508 // make sure to close both listener and channels.
513 void ChannelAccepted (IChannel ch)
516 if (ch == null) // could happen when it was aborted
519 var dis = ch as IDisposable;
527 ch.Opened += delegate {
528 ch.Faulted += delegate {
530 if (channels.Contains (ch))
532 throttle_wait_handle.Set (); // release loop wait lock.
534 ch.Closed += delegate {
536 if (channels.Contains (ch))
538 throttle_wait_handle.Set (); // release loop wait lock.
543 creator_handle.Set ();
546 ProcessRequestOrInput (ch);
549 void ProcessRequestOrInput (IChannel ch)
551 var reply = ch as IReplyChannel;
552 var input = ch as IInputChannel;
555 if (owner.ReceiveSynchronously) {
557 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
558 ProcessRequest (reply, rc);
560 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
562 } else if (input != null) {
563 if (owner.ReceiveSynchronously) {
565 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
566 ProcessInput (input, msg);
568 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
573 void TryReceiveRequestDone (IAsyncResult result)
576 var reply = (IReplyChannel) result.AsyncState;
577 if (reply.EndTryReceiveRequest (result, out rc))
578 ProcessRequest (reply, rc);
583 void TryReceiveDone (IAsyncResult result)
586 var input = (IInputChannel) result.AsyncState;
587 if (input.EndTryReceive (result, out msg))
588 ProcessInput (input, msg);
593 void ProcessRequest (IReplyChannel reply, RequestContext rc)
596 var req = rc.RequestMessage;
597 var ed = FindEndpointDispatcher (req);
598 new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
599 } catch (Exception ex) {
601 if (ProcessErrorWithHandlers (reply, ex, out res))
606 reply.Close (owner.DefaultCloseTimeout); // close the channel
610 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
611 if (loop && reply.State != CommunicationState.Closed)
612 ProcessRequestOrInput (reply);
616 bool ProcessErrorWithHandlers (IChannel ch, Exception ex, out Message res)
620 foreach (var eh in owner.ErrorHandlers)
621 if (eh.HandleError (ex))
622 return true; // error is handled appropriately.
624 Logger.Error ("An error occured, to be handled", ex);
626 foreach (var eh in owner.ErrorHandlers)
627 eh.ProvideFault (ex, owner.MessageVersion, ref res);
629 var conv = ch.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (owner.MessageVersion);
630 if (!conv.TryCreateFaultMessage (ex, out res))
631 res = Message.CreateMessage (owner.MessageVersion, new FaultCode ("Receiver"), ex.Message, owner.MessageVersion.Addressing.FaultNamespace);
637 void ProcessInput (IInputChannel input, Message message)
640 EndpointDispatcher candidate = null;
641 candidate = FindEndpointDispatcher (message);
642 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).ProcessInput (message);
644 catch (Exception ex) {
646 ProcessErrorWithHandlers (input, ex, out dummy);
648 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
649 if (loop && input.State != CommunicationState.Closed)
650 ProcessRequestOrInput (input);
654 EndpointDispatcher FindEndpointDispatcher (Message message) {
655 EndpointDispatcher candidate = null;
656 bool hasEndpointMatch = false;
657 foreach (var endpoint in owner.Endpoints) {
658 if (endpoint.AddressFilter.Match (message)) {
659 hasEndpointMatch = true;
660 if (!endpoint.ContractFilter.Match (message))
662 var newdis = endpoint;
663 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
665 else if (candidate.FilterPriority == newdis.FilterPriority)
666 throw new MultipleFilterMatchesException ();
669 if (candidate == null && !hasEndpointMatch) {
670 if (owner.Host != null)
671 owner.Host.OnUnknownMessageReceived (message);
672 // we have to return a fault to the client anyways...
673 throw new EndpointNotFoundException ();
675 else if (candidate == null)
676 // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
677 throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));