2 // ChannelDispatcher.cs
5 // Atsushi Enomoto <atsushi@ximian.com>
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
38 namespace System.ServiceModel.Dispatcher
40 public class ChannelDispatcher : ChannelDispatcherBase
45 Collection<IErrorHandler> error_handlers
46 = new Collection<IErrorHandler> ();
47 IChannelListener listener;
48 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
49 MessageVersion message_version;
50 bool receive_sync, include_exception_detail_in_faults,
51 manual_addressing, is_tx_receive;
52 int max_tx_batch_size;
53 SynchronizedCollection<IChannelInitializer> initializers
54 = new SynchronizedCollection<IChannelInitializer> ();
55 IsolationLevel tx_isolation_level;
57 ServiceThrottle throttle;
59 Guid identifier = Guid.NewGuid ();
60 ManualResetEvent async_event = new ManualResetEvent (false);
62 ListenerLoopManager loop_manager;
63 SynchronizedCollection<EndpointDispatcher> endpoints;
65 [MonoTODO ("get binding info from config")]
66 public ChannelDispatcher (IChannelListener listener)
67 : this (listener, null)
71 public ChannelDispatcher (
72 IChannelListener listener, string bindingName)
73 : this (listener, bindingName, null)
77 public ChannelDispatcher (
78 IChannelListener listener, string bindingName,
79 IDefaultCommunicationTimeouts timeouts)
82 throw new ArgumentNullException ("listener");
83 Init (listener, bindingName, timeouts);
86 private void Init (IChannelListener listener, string bindingName,
87 IDefaultCommunicationTimeouts timeouts)
89 this.listener = listener;
90 this.binding_name = bindingName;
91 // IChannelListener is often a ChannelListenerBase
92 // which implements IDefaultCommunicationTimeouts.
93 this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
94 endpoints = new SynchronizedCollection<EndpointDispatcher> ();
97 public string BindingName {
98 get { return binding_name; }
101 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
102 get { return initializers; }
105 protected internal override TimeSpan DefaultCloseTimeout {
106 get { return timeouts.CloseTimeout; }
109 protected internal override TimeSpan DefaultOpenTimeout {
110 get { return timeouts.OpenTimeout; }
113 public Collection<IErrorHandler> ErrorHandlers {
114 get { return error_handlers; }
117 public SynchronizedCollection<EndpointDispatcher> Endpoints {
118 get { return endpoints; }
122 public bool IsTransactedAccept {
123 get { throw new NotImplementedException (); }
126 public bool IsTransactedReceive {
127 get { return is_tx_receive; }
128 set { is_tx_receive = value; }
131 public bool ManualAddressing {
132 get { return manual_addressing; }
133 set { manual_addressing = value; }
136 public int MaxTransactedBatchSize {
137 get { return max_tx_batch_size; }
138 set { max_tx_batch_size = value; }
141 public override ServiceHostBase Host {
145 public override IChannelListener Listener {
146 get { return listener; }
149 public MessageVersion MessageVersion {
150 get { return message_version; }
151 set { message_version = value; }
154 public bool ReceiveSynchronously {
155 get { return receive_sync; }
157 ThrowIfDisposedOrImmutable ();
158 receive_sync = value;
162 public bool IncludeExceptionDetailInFaults {
163 get { return include_exception_detail_in_faults; }
164 set { include_exception_detail_in_faults = value; }
167 public ServiceThrottle ServiceThrottle {
168 get { return throttle; }
169 set { throttle = value; }
172 public IsolationLevel TransactionIsolationLevel {
173 get { return tx_isolation_level; }
174 set { tx_isolation_level = value; }
177 public TimeSpan TransactionTimeout {
178 get { return tx_timeout; }
179 set { tx_timeout = value; }
182 protected internal override void Attach (ServiceHostBase host)
187 public override void CloseInput ()
189 if (loop_manager != null)
190 loop_manager.CloseInput ();
193 protected internal override void Detach (ServiceHostBase host)
198 protected override void OnAbort ()
200 if (loop_manager != null)
201 loop_manager.Stop (TimeSpan.FromTicks (1));
204 Action<TimeSpan> open_delegate;
205 Action<TimeSpan> close_delegate;
207 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
208 AsyncCallback callback, object state)
210 if (close_delegate == null)
211 close_delegate = new Action<TimeSpan> (OnClose);
212 return close_delegate.BeginInvoke (timeout, callback, state);
215 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
216 AsyncCallback callback, object state)
218 if (open_delegate == null)
219 open_delegate = new Action<TimeSpan> (OnClose);
220 return open_delegate.BeginInvoke (timeout, callback, state);
223 protected override void OnClose (TimeSpan timeout)
225 if (loop_manager != null)
226 loop_manager.Stop (timeout);
229 protected override void OnClosed ()
232 host.ChannelDispatchers.Remove (this);
236 protected override void OnEndClose (IAsyncResult result)
238 close_delegate.EndInvoke (result);
241 protected override void OnEndOpen (IAsyncResult result)
243 open_delegate.EndInvoke (result);
246 protected override void OnOpen (TimeSpan timeout)
248 if (Host == null || MessageVersion == null)
249 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
251 loop_manager = new ListenerLoopManager (this, timeout);
254 [MonoTODO ("what to do here?")]
255 protected override void OnOpening ()
259 protected override void OnOpened ()
261 loop_manager.Setup ();
264 internal void StartLoop ()
266 // FIXME: not sure if it should be filled here.
267 if (ServiceThrottle == null)
268 ServiceThrottle = new ServiceThrottle ();
270 loop_manager.Start ();
273 bool IsMessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
275 Uri to = req.Headers.To;
278 if (to.AbsoluteUri == Constants.WsaAnonymousUri)
280 return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);
283 void HandleError (Exception ex)
285 foreach (IErrorHandler handler in ErrorHandlers)
286 if (handler.HandleError (ex))
290 class ListenerLoopManager
292 ChannelDispatcher owner;
293 AutoResetEvent handle = new AutoResetEvent (false);
294 AutoResetEvent creator_handle = new AutoResetEvent (false);
295 ManualResetEvent stop_handle = new ManualResetEvent (false);
298 TimeSpan open_timeout;
299 Func<IAsyncResult> channel_acceptor;
300 List<IChannel> channels = new List<IChannel> ();
302 public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
305 open_timeout = openTimeout;
310 if (owner.Listener.State != CommunicationState.Opened)
311 owner.Listener.Open (open_timeout);
313 // It is tested at Open(), but strangely it is not instantiated at this point.
314 foreach (var ed in owner.Endpoints)
315 if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
316 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
317 SetupChannelAcceptor ();
322 foreach (var ed in owner.Endpoints)
323 if (ed.DispatchRuntime.InstanceContextProvider == null)
324 ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
326 if (loop_thread == null)
327 loop_thread = new Thread (new ThreadStart (Loop));
328 loop_thread.Start ();
331 Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
333 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
336 AsyncCallback callback = delegate (IAsyncResult result) {
338 ChannelAccepted (r.EndAcceptChannel (result));
339 } catch (Exception ex) {
340 Console.WriteLine ("Exception during finishing channel acceptance.");
341 Console.WriteLine (ex);
346 return r.BeginAcceptChannel (callback, null);
347 } catch (Exception ex) {
348 Console.WriteLine ("Exception during accepting channel.");
349 Console.WriteLine (ex);
355 void SetupChannelAcceptor ()
357 var l = owner.Listener;
359 CreateAcceptor<IReplyChannel> (l) ??
360 CreateAcceptor<IReplySessionChannel> (l) ??
361 CreateAcceptor<IInputChannel> (l) ??
362 CreateAcceptor<IInputSessionChannel> (l) ??
363 CreateAcceptor<IDuplexChannel> (l) ??
364 CreateAcceptor<IDuplexSessionChannel> (l);
365 if (channel_acceptor == null)
366 throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
369 public void Stop (TimeSpan timeout)
372 creator_handle.Set ();
374 if (stop_handle != null) {
375 stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
376 stop_handle.Close ();
379 if (owner.Listener.State != CommunicationState.Closed)
380 owner.Listener.Abort ();
381 if (loop_thread != null && loop_thread.IsAlive)
382 loop_thread.Abort ();
386 public void CloseInput ()
388 foreach (var ch in channels.ToArray ()) {
389 if (ch.State == CommunicationState.Closed)
390 channels.Remove (ch); // zonbie, if exists
400 } catch (Exception ex) {
402 Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
403 Console.WriteLine (ex);
405 if (stop_handle != null)
414 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
415 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
417 //FIXME: The logic here should be somewhat different as follows:
419 //2. Get the appropriate EndPointDispatcher that can handle the message
420 // which is done using the filters (AddressFilter, ContractFilter).
421 //3. Let the appropriate endpoint handle the request.
424 while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
426 creator_handle.WaitOne (); // released by ChannelAccepted()
427 creator_handle.Reset ();
431 handle.WaitOne (); // released by IChannel.Close()
434 owner.Listener.Close ();
438 void ChannelAccepted (IChannel ch)
441 if (ch == null) // could happen when it was aborted
444 var dis = ch as IDisposable;
451 ch.Opened += delegate {
452 ch.Faulted += delegate {
453 if (channels.Contains (ch))
454 channels.Remove (ch);
455 handle.Set (); // release loop wait lock.
457 ch.Closed += delegate {
458 if (channels.Contains (ch))
459 channels.Remove (ch);
460 handle.Set (); // release loop wait lock.
464 creator_handle.Set ();
468 ProcessRequestOrInput (ch);
471 void ProcessRequestOrInput (IChannel ch)
473 var reply = ch as IReplyChannel;
474 var input = ch as IInputChannel;
477 if (owner.ReceiveSynchronously) {
479 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
480 ProcessRequest (reply, rc);
482 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
484 } else if (input != null) {
485 if (owner.ReceiveSynchronously) {
487 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
488 ProcessInput (input, msg);
490 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
495 void TryReceiveRequestDone (IAsyncResult result)
498 var reply = (IReplyChannel) result.AsyncState;
499 if (reply.EndTryReceiveRequest (result, out rc))
500 ProcessRequest (reply, rc);
503 void TryReceiveDone (IAsyncResult result)
506 var input = (IInputChannel) result.AsyncState;
507 if (input.EndTryReceive (result, out msg))
508 ProcessInput (input, msg);
511 void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex)
515 MessageVersion version = rc.RequestMessage.Version;
516 FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
517 Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
520 catch (Exception e) { }
523 void ProcessRequest (IReplyChannel reply, RequestContext rc)
526 EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
527 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
529 } catch (EndpointNotFoundException ex) {
530 SendEndpointNotFound (rc, ex);
531 } catch (Exception ex) {
533 Console.WriteLine (ex);
535 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
536 if (reply.State != CommunicationState.Closed)
537 ProcessRequestOrInput (reply);
541 void ProcessInput (IInputChannel input, Message message)
544 EndpointDispatcher candidate = null;
545 candidate = FindEndpointDispatcher (message);
546 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
547 ProcessInput (message);
549 catch (Exception ex) {
551 Console.WriteLine (ex);
553 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
554 if (input.State != CommunicationState.Closed)
555 ProcessRequestOrInput (input);
559 EndpointDispatcher FindEndpointDispatcher (Message message) {
560 EndpointDispatcher candidate = null;
561 for (int i = 0; i < owner.Endpoints.Count; i++) {
562 if (owner.IsMessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
563 candidate = owner.Endpoints [i];
567 if (candidate == null)
568 throw new EndpointNotFoundException (String.Format ("The request message has the target '{0}' which is not reachable in this service contract", message.Headers.To));