2 // ChannelDispatcher.cs
5 // Atsushi Enomoto <atsushi@ximian.com>
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
38 namespace System.ServiceModel.Dispatcher
40 internal interface IChannelDispatcherBoundListener
42 ChannelDispatcher ChannelDispatcher { get; set; }
45 public class ChannelDispatcher : ChannelDispatcherBase
47 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
49 public EndpointDispatcherCollection (ChannelDispatcher owner)
54 ChannelDispatcher owner;
56 protected override void ClearItems ()
58 foreach (var ed in this)
59 ed.ChannelDispatcher = null;
63 protected override void InsertItem (int index, EndpointDispatcher item)
65 item.ChannelDispatcher = owner;
66 base.InsertItem (index, item);
69 protected override void RemoveItem (int index)
72 this [index].ChannelDispatcher = null;
73 base.RemoveItem (index);
76 protected override void SetItem (int index, EndpointDispatcher item)
78 item.ChannelDispatcher = owner;
79 base.SetItem (index, item);
86 Collection<IErrorHandler> error_handlers
87 = new Collection<IErrorHandler> ();
88 IChannelListener listener;
89 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
90 MessageVersion message_version;
91 bool receive_sync, include_exception_detail_in_faults,
92 manual_addressing, is_tx_receive;
93 int max_tx_batch_size;
94 SynchronizedCollection<IChannelInitializer> initializers
95 = new SynchronizedCollection<IChannelInitializer> ();
96 IsolationLevel tx_isolation_level;
98 ServiceThrottle throttle;
100 Guid identifier = Guid.NewGuid ();
101 ManualResetEvent async_event = new ManualResetEvent (false);
103 ListenerLoopManager loop_manager;
104 SynchronizedCollection<EndpointDispatcher> endpoints;
106 [MonoTODO ("get binding info from config")]
107 public ChannelDispatcher (IChannelListener listener)
108 : this (listener, null)
112 public ChannelDispatcher (
113 IChannelListener listener, string bindingName)
114 : this (listener, bindingName, null)
118 public ChannelDispatcher (
119 IChannelListener listener, string bindingName,
120 IDefaultCommunicationTimeouts timeouts)
122 if (listener == null)
123 throw new ArgumentNullException ("listener");
124 Init (listener, bindingName, timeouts);
127 private void Init (IChannelListener listener, string bindingName,
128 IDefaultCommunicationTimeouts timeouts)
130 this.listener = listener;
131 this.binding_name = bindingName;
132 // IChannelListener is often a ChannelListenerBase
133 // which implements IDefaultCommunicationTimeouts.
134 this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
135 endpoints = new EndpointDispatcherCollection (this);
138 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
140 //Attach one EndpointDispacher to the ChannelDispatcher
141 EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
142 this.Endpoints.Add (ed);
143 ed.InitializeServiceEndpoint (false, serviceType, se);
147 public string BindingName {
148 get { return binding_name; }
151 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
152 get { return initializers; }
155 protected internal override TimeSpan DefaultCloseTimeout {
156 get { return timeouts.CloseTimeout; }
159 protected internal override TimeSpan DefaultOpenTimeout {
160 get { return timeouts.OpenTimeout; }
163 public Collection<IErrorHandler> ErrorHandlers {
164 get { return error_handlers; }
167 public SynchronizedCollection<EndpointDispatcher> Endpoints {
168 get { return endpoints; }
172 public bool IsTransactedAccept {
173 get { throw new NotImplementedException (); }
176 public bool IsTransactedReceive {
177 get { return is_tx_receive; }
178 set { is_tx_receive = value; }
181 public bool ManualAddressing {
182 get { return manual_addressing; }
183 set { manual_addressing = value; }
186 public int MaxTransactedBatchSize {
187 get { return max_tx_batch_size; }
188 set { max_tx_batch_size = value; }
191 public override ServiceHostBase Host {
195 public override IChannelListener Listener {
196 get { return listener; }
199 public MessageVersion MessageVersion {
200 get { return message_version; }
201 set { message_version = value; }
204 public bool ReceiveSynchronously {
205 get { return receive_sync; }
207 ThrowIfDisposedOrImmutable ();
208 receive_sync = value;
212 public bool IncludeExceptionDetailInFaults {
213 get { return include_exception_detail_in_faults; }
214 set { include_exception_detail_in_faults = value; }
217 public ServiceThrottle ServiceThrottle {
218 get { return throttle; }
219 set { throttle = value; }
222 public IsolationLevel TransactionIsolationLevel {
223 get { return tx_isolation_level; }
224 set { tx_isolation_level = value; }
227 public TimeSpan TransactionTimeout {
228 get { return tx_timeout; }
229 set { tx_timeout = value; }
232 protected internal override void Attach (ServiceHostBase host)
235 var bl = listener as IChannelDispatcherBoundListener;
237 bl.ChannelDispatcher = this;
240 public override void CloseInput ()
242 if (loop_manager != null)
243 loop_manager.CloseInput ();
246 protected internal override void Detach (ServiceHostBase host)
251 protected override void OnAbort ()
253 if (loop_manager != null)
254 loop_manager.Stop (TimeSpan.FromTicks (1));
257 Action<TimeSpan> open_delegate;
258 Action<TimeSpan> close_delegate;
260 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
261 AsyncCallback callback, object state)
263 if (close_delegate == null)
264 close_delegate = new Action<TimeSpan> (OnClose);
265 return close_delegate.BeginInvoke (timeout, callback, state);
268 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
269 AsyncCallback callback, object state)
271 if (open_delegate == null)
272 open_delegate = new Action<TimeSpan> (OnOpen);
273 return open_delegate.BeginInvoke (timeout, callback, state);
276 protected override void OnClose (TimeSpan timeout)
278 if (loop_manager != null)
279 loop_manager.Stop (timeout);
282 protected override void OnClosed ()
285 host.ChannelDispatchers.Remove (this);
289 protected override void OnEndClose (IAsyncResult result)
291 close_delegate.EndInvoke (result);
294 protected override void OnEndOpen (IAsyncResult result)
296 open_delegate.EndInvoke (result);
299 protected override void OnOpen (TimeSpan timeout)
301 if (Host == null || MessageVersion == null)
302 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
304 loop_manager.Setup (timeout);
307 protected override void OnOpening ()
310 loop_manager = new ListenerLoopManager (this);
313 protected override void OnOpened ()
321 // FIXME: not sure if it should be filled here.
322 if (ServiceThrottle == null)
323 ServiceThrottle = new ServiceThrottle (this);
325 loop_manager.Start ();
329 // isolated from ChannelDispatcher
330 class ListenerLoopManager
332 ChannelDispatcher owner;
333 AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
334 AutoResetEvent creator_handle = new AutoResetEvent (false);
335 ManualResetEvent stop_handle = new ManualResetEvent (false);
338 DateTime close_started;
339 TimeSpan close_timeout;
340 Func<IAsyncResult> channel_acceptor;
341 List<IChannel> channels = new List<IChannel> ();
342 AddressFilterMode address_filter_mode;
344 public ListenerLoopManager (ChannelDispatcher owner)
347 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
349 address_filter_mode = sba.AddressFilterMode;
352 public void Setup (TimeSpan openTimeout)
354 if (owner.Listener.State != CommunicationState.Created)
355 throw new InvalidOperationException ("Tried to open the channel listener which is bound to ChannelDispatcher, but it is not at Created state");
356 owner.Listener.Open (openTimeout);
358 // It is tested at Open(), but strangely it is not instantiated at this point.
359 foreach (var ed in owner.Endpoints)
360 if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
361 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
362 SetupChannelAcceptor ();
367 if (loop_thread == null)
368 loop_thread = new Thread (new ThreadStart (Loop));
369 loop_thread.Start ();
372 Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
374 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
377 AsyncCallback callback = delegate (IAsyncResult result) {
379 ChannelAccepted (r.EndAcceptChannel (result));
380 } catch (Exception ex) {
381 Console.WriteLine ("Exception during finishing channel acceptance.");
382 Console.WriteLine (ex);
383 creator_handle.Set ();
388 return r.BeginAcceptChannel (callback, null);
389 } catch (Exception ex) {
390 Console.WriteLine ("Exception during accepting channel.");
391 Console.WriteLine (ex);
397 void SetupChannelAcceptor ()
399 var l = owner.Listener;
401 CreateAcceptor<IReplyChannel> (l) ??
402 CreateAcceptor<IReplySessionChannel> (l) ??
403 CreateAcceptor<IInputChannel> (l) ??
404 CreateAcceptor<IInputSessionChannel> (l) ??
405 CreateAcceptor<IDuplexChannel> (l) ??
406 CreateAcceptor<IDuplexSessionChannel> (l);
407 if (channel_acceptor == null)
408 throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
411 public void Stop (TimeSpan timeout)
413 if (loop_thread == null)
416 close_started = DateTime.Now;
417 close_timeout = timeout;
419 creator_handle.Set ();
420 throttle_wait_handle.Set (); // break primary loop
421 if (stop_handle != null) {
422 stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
423 stop_handle.Close ();
426 if (owner.Listener.State != CommunicationState.Closed) {
428 Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
429 owner.Listener.Abort ();
431 if (loop_thread != null && loop_thread.IsAlive)
432 loop_thread.Abort ();
436 public void CloseInput ()
438 foreach (var ch in channels.ToArray ()) {
439 if (ch.State == CommunicationState.Closed)
440 channels.Remove (ch); // zonbie, if exists
443 ch.Close (close_timeout - (DateTime.Now - close_started));
444 } catch (Exception ex) {
446 Console.WriteLine (ex);
457 } catch (Exception ex) {
459 Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
460 Console.WriteLine (ex);
462 if (stop_handle != null)
471 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
472 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
475 // FIXME: take MaxConcurrentCalls into consideration appropriately.
476 while (loop && channels.Count < Math.Min (owner.ServiceThrottle.MaxConcurrentSessions, owner.ServiceThrottle.MaxConcurrentCalls)) {
477 // FIXME: this should not be required, but saves multi-ChannelDispatcher case (Throttling enabled) for HTTP standalone listener...
480 creator_handle.WaitOne (); // released by ChannelAccepted()
484 throttle_wait_handle.WaitOne (); // released by IChannel.Close()
487 owner.Listener.Close ();
489 // make sure to close both listener and channels.
494 void ChannelAccepted (IChannel ch)
497 if (ch == null) // could happen when it was aborted
500 var dis = ch as IDisposable;
508 ch.Opened += delegate {
509 ch.Faulted += delegate {
511 if (channels.Contains (ch))
512 channels.Remove (ch);
513 throttle_wait_handle.Set (); // release loop wait lock.
515 ch.Closed += delegate {
517 if (channels.Contains (ch))
518 channels.Remove (ch);
519 throttle_wait_handle.Set (); // release loop wait lock.
524 creator_handle.Set ();
527 ProcessRequestOrInput (ch);
530 void ProcessRequestOrInput (IChannel ch)
532 var reply = ch as IReplyChannel;
533 var input = ch as IInputChannel;
536 if (owner.ReceiveSynchronously) {
538 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
539 ProcessRequest (reply, rc);
541 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
543 } else if (input != null) {
544 if (owner.ReceiveSynchronously) {
546 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
547 ProcessInput (input, msg);
549 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
554 void TryReceiveRequestDone (IAsyncResult result)
557 var reply = (IReplyChannel) result.AsyncState;
558 if (reply.EndTryReceiveRequest (result, out rc))
559 ProcessRequest (reply, rc);
564 void TryReceiveDone (IAsyncResult result)
567 var input = (IInputChannel) result.AsyncState;
568 if (input.EndTryReceive (result, out msg))
569 ProcessInput (input, msg);
574 void ProcessRequest (IReplyChannel reply, RequestContext rc)
576 var req = rc.RequestMessage;
578 var ed = FindEndpointDispatcher (req);
579 new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
580 } catch (Exception ex) {
581 foreach (var eh in owner.ErrorHandlers)
582 if (eh.HandleError (ex))
583 return; // error is handled appropriately.
586 Console.WriteLine (ex);
589 foreach (var eh in owner.ErrorHandlers)
590 eh.ProvideFault (ex, owner.MessageVersion, ref res);
592 var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
593 if (!conv.TryCreateFaultMessage (ex, out res))
594 res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
601 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
602 if (loop && reply.State != CommunicationState.Closed)
603 ProcessRequestOrInput (reply);
607 void ProcessInput (IInputChannel input, Message message)
610 EndpointDispatcher candidate = null;
611 candidate = FindEndpointDispatcher (message);
612 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
613 ProcessInput (message);
615 catch (Exception ex) {
617 Console.WriteLine (ex);
619 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
620 if (loop && input.State != CommunicationState.Closed)
621 ProcessRequestOrInput (input);
625 EndpointDispatcher FindEndpointDispatcher (Message message) {
626 EndpointDispatcher candidate = null;
627 bool hasEndpointMatch = false;
628 foreach (var endpoint in owner.Endpoints) {
629 if (endpoint.AddressFilter.Match (message)) {
630 hasEndpointMatch = true;
631 if (!endpoint.ContractFilter.Match (message))
633 var newdis = endpoint;
634 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
636 else if (candidate.FilterPriority == newdis.FilterPriority)
637 throw new MultipleFilterMatchesException ();
640 if (candidate == null && !hasEndpointMatch) {
641 if (owner.Host != null)
642 owner.Host.OnUnknownMessageReceived (message);
643 // we have to return a fault to the client anyways...
644 throw new EndpointNotFoundException ();
646 else if (candidate == null)
647 // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
648 throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));