2 // ChannelDispatcher.cs
5 // Atsushi Enomoto <atsushi@ximian.com>
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
38 namespace System.ServiceModel.Dispatcher
40 public class ChannelDispatcher : ChannelDispatcherBase
42 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
44 public EndpointDispatcherCollection (ChannelDispatcher owner)
49 ChannelDispatcher owner;
51 protected override void ClearItems ()
53 foreach (var ed in this)
54 ed.ChannelDispatcher = null;
58 protected override void InsertItem (int index, EndpointDispatcher item)
60 item.ChannelDispatcher = owner;
61 base.InsertItem (index, item);
64 protected virtual void RemoveItem (int index)
67 this [index].ChannelDispatcher = null;
68 base.RemoveItem (index);
71 protected virtual void SetItem (int index, EndpointDispatcher item)
73 item.ChannelDispatcher = owner;
74 base.SetItem (index, item);
81 Collection<IErrorHandler> error_handlers
82 = new Collection<IErrorHandler> ();
83 IChannelListener listener;
84 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
85 MessageVersion message_version;
86 bool receive_sync, include_exception_detail_in_faults,
87 manual_addressing, is_tx_receive;
88 int max_tx_batch_size;
89 SynchronizedCollection<IChannelInitializer> initializers
90 = new SynchronizedCollection<IChannelInitializer> ();
91 IsolationLevel tx_isolation_level;
93 ServiceThrottle throttle;
95 Guid identifier = Guid.NewGuid ();
96 ManualResetEvent async_event = new ManualResetEvent (false);
98 ListenerLoopManager loop_manager;
99 SynchronizedCollection<EndpointDispatcher> endpoints;
101 [MonoTODO ("get binding info from config")]
102 public ChannelDispatcher (IChannelListener listener)
103 : this (listener, null)
107 public ChannelDispatcher (
108 IChannelListener listener, string bindingName)
109 : this (listener, bindingName, null)
113 public ChannelDispatcher (
114 IChannelListener listener, string bindingName,
115 IDefaultCommunicationTimeouts timeouts)
117 if (listener == null)
118 throw new ArgumentNullException ("listener");
119 Init (listener, bindingName, timeouts);
122 private void Init (IChannelListener listener, string bindingName,
123 IDefaultCommunicationTimeouts timeouts)
125 this.listener = listener;
126 this.binding_name = bindingName;
127 // IChannelListener is often a ChannelListenerBase
128 // which implements IDefaultCommunicationTimeouts.
129 this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
130 endpoints = new EndpointDispatcherCollection (this);
133 internal void InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
135 this.MessageVersion = se.Binding.MessageVersion;
136 if (this.MessageVersion == null)
137 this.MessageVersion = MessageVersion.Default;
139 //Attach one EndpointDispacher to the ChannelDispatcher
140 EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
141 this.Endpoints.Add (ed);
142 ed.InitializeServiceEndpoint (false, serviceType, se);
145 public string BindingName {
146 get { return binding_name; }
149 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
150 get { return initializers; }
153 protected internal override TimeSpan DefaultCloseTimeout {
154 get { return timeouts.CloseTimeout; }
157 protected internal override TimeSpan DefaultOpenTimeout {
158 get { return timeouts.OpenTimeout; }
161 public Collection<IErrorHandler> ErrorHandlers {
162 get { return error_handlers; }
165 public SynchronizedCollection<EndpointDispatcher> Endpoints {
166 get { return endpoints; }
170 public bool IsTransactedAccept {
171 get { throw new NotImplementedException (); }
174 public bool IsTransactedReceive {
175 get { return is_tx_receive; }
176 set { is_tx_receive = value; }
179 public bool ManualAddressing {
180 get { return manual_addressing; }
181 set { manual_addressing = value; }
184 public int MaxTransactedBatchSize {
185 get { return max_tx_batch_size; }
186 set { max_tx_batch_size = value; }
189 public override ServiceHostBase Host {
193 public override IChannelListener Listener {
194 get { return listener; }
197 public MessageVersion MessageVersion {
198 get { return message_version; }
199 set { message_version = value; }
202 public bool ReceiveSynchronously {
203 get { return receive_sync; }
205 ThrowIfDisposedOrImmutable ();
206 receive_sync = value;
210 public bool IncludeExceptionDetailInFaults {
211 get { return include_exception_detail_in_faults; }
212 set { include_exception_detail_in_faults = value; }
215 public ServiceThrottle ServiceThrottle {
216 get { return throttle; }
217 set { throttle = value; }
220 public IsolationLevel TransactionIsolationLevel {
221 get { return tx_isolation_level; }
222 set { tx_isolation_level = value; }
225 public TimeSpan TransactionTimeout {
226 get { return tx_timeout; }
227 set { tx_timeout = value; }
230 protected internal override void Attach (ServiceHostBase host)
235 public override void CloseInput ()
237 if (loop_manager != null)
238 loop_manager.CloseInput ();
241 protected internal override void Detach (ServiceHostBase host)
246 protected override void OnAbort ()
248 if (loop_manager != null)
249 loop_manager.Stop (TimeSpan.FromTicks (1));
252 Action<TimeSpan> open_delegate;
253 Action<TimeSpan> close_delegate;
255 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
256 AsyncCallback callback, object state)
258 if (close_delegate == null)
259 close_delegate = new Action<TimeSpan> (OnClose);
260 return close_delegate.BeginInvoke (timeout, callback, state);
263 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
264 AsyncCallback callback, object state)
266 if (open_delegate == null)
267 open_delegate = new Action<TimeSpan> (OnClose);
268 return open_delegate.BeginInvoke (timeout, callback, state);
271 protected override void OnClose (TimeSpan timeout)
273 if (loop_manager != null)
274 loop_manager.Stop (timeout);
277 protected override void OnClosed ()
280 host.ChannelDispatchers.Remove (this);
284 protected override void OnEndClose (IAsyncResult result)
286 close_delegate.EndInvoke (result);
289 protected override void OnEndOpen (IAsyncResult result)
291 open_delegate.EndInvoke (result);
294 protected override void OnOpen (TimeSpan timeout)
296 if (Host == null || MessageVersion == null)
297 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
299 loop_manager = new ListenerLoopManager (this, timeout);
302 [MonoTODO ("what to do here?")]
303 protected override void OnOpening ()
307 protected override void OnOpened ()
309 loop_manager.Setup ();
312 internal void StartLoop ()
314 // FIXME: not sure if it should be filled here.
315 if (ServiceThrottle == null)
316 ServiceThrottle = new ServiceThrottle ();
318 loop_manager.Start ();
322 // isolated from ChannelDispatcher
323 class ListenerLoopManager
325 ChannelDispatcher owner;
326 AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
327 AutoResetEvent creator_handle = new AutoResetEvent (false);
328 ManualResetEvent stop_handle = new ManualResetEvent (false);
331 DateTime close_started;
332 TimeSpan open_timeout, close_timeout;
333 Func<IAsyncResult> channel_acceptor;
334 List<IChannel> channels = new List<IChannel> ();
336 public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
339 open_timeout = openTimeout;
344 if (owner.Listener.State != CommunicationState.Opened)
345 owner.Listener.Open (open_timeout);
347 // It is tested at Open(), but strangely it is not instantiated at this point.
348 foreach (var ed in owner.Endpoints)
349 if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
350 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
351 SetupChannelAcceptor ();
356 foreach (var ed in owner.Endpoints)
357 if (ed.DispatchRuntime.InstanceContextProvider == null)
358 ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
360 if (loop_thread == null)
361 loop_thread = new Thread (new ThreadStart (Loop));
362 loop_thread.Start ();
365 Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
367 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
370 AsyncCallback callback = delegate (IAsyncResult result) {
372 ChannelAccepted (r.EndAcceptChannel (result));
373 } catch (Exception ex) {
374 Console.WriteLine ("Exception during finishing channel acceptance.");
375 Console.WriteLine (ex);
380 return r.BeginAcceptChannel (callback, null);
381 } catch (Exception ex) {
382 Console.WriteLine ("Exception during accepting channel.");
383 Console.WriteLine (ex);
389 void SetupChannelAcceptor ()
391 var l = owner.Listener;
393 CreateAcceptor<IReplyChannel> (l) ??
394 CreateAcceptor<IReplySessionChannel> (l) ??
395 CreateAcceptor<IInputChannel> (l) ??
396 CreateAcceptor<IInputSessionChannel> (l) ??
397 CreateAcceptor<IDuplexChannel> (l) ??
398 CreateAcceptor<IDuplexSessionChannel> (l);
399 if (channel_acceptor == null)
400 throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
403 public void Stop (TimeSpan timeout)
405 if (loop_thread == null)
408 close_started = DateTime.Now;
409 close_timeout = timeout;
411 creator_handle.Set ();
412 throttle_wait_handle.Set (); // break primary loop
413 if (stop_handle != null) {
414 stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
415 stop_handle.Close ();
418 if (owner.Listener.State != CommunicationState.Closed)
419 owner.Listener.Abort ();
420 if (loop_thread != null && loop_thread.IsAlive)
421 loop_thread.Abort ();
425 public void CloseInput ()
427 foreach (var ch in channels.ToArray ()) {
428 if (ch.State == CommunicationState.Closed)
429 channels.Remove (ch); // zonbie, if exists
432 ch.Close (close_timeout - (DateTime.Now - close_started));
433 } catch (Exception ex) {
435 Console.WriteLine (ex);
446 } catch (Exception ex) {
448 Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
449 Console.WriteLine (ex);
451 if (stop_handle != null)
460 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
461 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
463 //FIXME: The logic here should be somewhat different as follows:
465 //2. Get the appropriate EndPointDispatcher that can handle the message
466 // which is done using the filters (AddressFilter, ContractFilter).
467 //3. Let the appropriate endpoint handle the request.
470 while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
472 creator_handle.WaitOne (); // released by ChannelAccepted()
476 throttle_wait_handle.WaitOne (); // released by IChannel.Close()
479 owner.Listener.Close ();
481 // make sure to close both listener and channels.
486 void ChannelAccepted (IChannel ch)
489 if (ch == null) // could happen when it was aborted
492 var dis = ch as IDisposable;
499 ch.Opened += delegate {
500 ch.Faulted += delegate {
501 if (channels.Contains (ch))
502 channels.Remove (ch);
503 throttle_wait_handle.Set (); // release loop wait lock.
505 ch.Closed += delegate {
506 if (channels.Contains (ch))
507 channels.Remove (ch);
508 throttle_wait_handle.Set (); // release loop wait lock.
513 creator_handle.Set ();
516 ProcessRequestOrInput (ch);
519 void ProcessRequestOrInput (IChannel ch)
521 var reply = ch as IReplyChannel;
522 var input = ch as IInputChannel;
525 if (owner.ReceiveSynchronously) {
527 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
528 ProcessRequest (reply, rc);
530 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
532 } else if (input != null) {
533 if (owner.ReceiveSynchronously) {
535 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
536 ProcessInput (input, msg);
538 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
543 void TryReceiveRequestDone (IAsyncResult result)
546 var reply = (IReplyChannel) result.AsyncState;
547 if (reply.EndTryReceiveRequest (result, out rc))
548 ProcessRequest (reply, rc);
551 void TryReceiveDone (IAsyncResult result)
554 var input = (IInputChannel) result.AsyncState;
555 if (input.EndTryReceive (result, out msg))
556 ProcessInput (input, msg);
559 void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex)
563 MessageVersion version = rc.RequestMessage.Version;
564 FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
565 Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
568 catch (Exception e) { }
571 void ProcessRequest (IReplyChannel reply, RequestContext rc)
574 EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
575 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
577 } catch (EndpointNotFoundException ex) {
578 SendEndpointNotFound (rc, ex);
579 } catch (Exception ex) {
581 Console.WriteLine (ex);
585 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
586 if (reply.State != CommunicationState.Closed)
587 ProcessRequestOrInput (reply);
591 void ProcessInput (IInputChannel input, Message message)
594 EndpointDispatcher candidate = null;
595 candidate = FindEndpointDispatcher (message);
596 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
597 ProcessInput (message);
599 catch (Exception ex) {
601 Console.WriteLine (ex);
603 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
604 if (input.State != CommunicationState.Closed)
605 ProcessRequestOrInput (input);
609 EndpointDispatcher FindEndpointDispatcher (Message message) {
610 EndpointDispatcher candidate = null;
611 for (int i = 0; i < owner.Endpoints.Count; i++) {
612 if (MessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
613 var newdis = owner.Endpoints [i];
614 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
616 else if (candidate.FilterPriority == newdis.FilterPriority)
617 throw new MultipleFilterMatchesException ();
620 if (candidate == null && owner.Host != null)
621 owner.Host.OnUnknownMessageReceived (message);
625 bool MessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
627 Uri to = req.Headers.To;
630 if (to.AbsoluteUri == Constants.WsaAnonymousUri)
632 return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);