2010-03-30 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         public class ChannelDispatcher : ChannelDispatcherBase
41         {
42                 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
43                 {
44                         public EndpointDispatcherCollection (ChannelDispatcher owner)
45                         {
46                                 this.owner = owner;
47                         }
48
49                         ChannelDispatcher owner;
50
51                         protected override void ClearItems ()
52                         {
53                                 foreach (var ed in this)
54                                         ed.ChannelDispatcher = null;
55                                 base.ClearItems ();
56                         }
57
58                         protected override void InsertItem (int index, EndpointDispatcher item)
59                         {
60                                 item.ChannelDispatcher = owner;
61                                 base.InsertItem (index, item);
62                         }
63
64                         protected override void RemoveItem (int index)
65                         {
66                                 if (index < Count)
67                                         this [index].ChannelDispatcher = null;
68                                 base.RemoveItem (index);
69                         }
70
71                         protected override void SetItem (int index, EndpointDispatcher item)
72                         {
73                                 item.ChannelDispatcher = owner;
74                                 base.SetItem (index, item);
75                         }
76                 }
77
78                 ServiceHostBase host;
79
80                 string binding_name;            
81                 Collection<IErrorHandler> error_handlers
82                         = new Collection<IErrorHandler> ();
83                 IChannelListener listener;
84                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
85                 MessageVersion message_version;
86                 bool receive_sync, include_exception_detail_in_faults,
87                         manual_addressing, is_tx_receive;
88                 int max_tx_batch_size;
89                 SynchronizedCollection<IChannelInitializer> initializers
90                         = new SynchronizedCollection<IChannelInitializer> ();
91                 IsolationLevel tx_isolation_level;
92                 TimeSpan tx_timeout;
93                 ServiceThrottle throttle;
94
95                 Guid identifier = Guid.NewGuid ();
96                 ManualResetEvent async_event = new ManualResetEvent (false);
97
98                 ListenerLoopManager loop_manager;
99                 SynchronizedCollection<EndpointDispatcher> endpoints;
100
101                 [MonoTODO ("get binding info from config")]
102                 public ChannelDispatcher (IChannelListener listener)
103                         : this (listener, null)
104                 {
105                 }
106
107                 public ChannelDispatcher (
108                         IChannelListener listener, string bindingName)
109                         : this (listener, bindingName, null)
110                 {
111                 }
112
113                 public ChannelDispatcher (
114                         IChannelListener listener, string bindingName,
115                         IDefaultCommunicationTimeouts timeouts)
116                 {
117                         if (listener == null)
118                                 throw new ArgumentNullException ("listener");
119                         Init (listener, bindingName, timeouts);
120                 }
121
122                 private void Init (IChannelListener listener, string bindingName,
123                         IDefaultCommunicationTimeouts timeouts)
124                 {
125                         this.listener = listener;
126                         this.binding_name = bindingName;
127                         // IChannelListener is often a ChannelListenerBase
128                         // which implements IDefaultCommunicationTimeouts.
129                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
130                         endpoints = new EndpointDispatcherCollection (this);
131                 }
132
133                 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
134                 {
135                         this.MessageVersion = se.Binding.MessageVersion;
136                         if (this.MessageVersion == null)
137                                 this.MessageVersion = MessageVersion.Default;
138
139                         //Attach one EndpointDispacher to the ChannelDispatcher
140                         EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
141                         this.Endpoints.Add (ed);
142                         ed.InitializeServiceEndpoint (false, serviceType, se);
143                         return ed;
144                 }
145
146                 public string BindingName {
147                         get { return binding_name; }
148                 }
149
150                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
151                         get { return initializers; }
152                 }
153
154                 protected internal override TimeSpan DefaultCloseTimeout {
155                         get { return timeouts.CloseTimeout; }
156                 }
157
158                 protected internal override TimeSpan DefaultOpenTimeout {
159                         get { return timeouts.OpenTimeout; }
160                 }
161
162                 public Collection<IErrorHandler> ErrorHandlers {
163                         get { return error_handlers; }
164                 }
165
166                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
167                         get { return endpoints; }
168                 }
169
170                 [MonoTODO]
171                 public bool IsTransactedAccept {
172                         get { throw new NotImplementedException (); }
173                 }
174
175                 public bool IsTransactedReceive {
176                         get { return is_tx_receive; }
177                         set { is_tx_receive = value; }
178                 }
179
180                 public bool ManualAddressing {
181                         get { return manual_addressing; }
182                         set { manual_addressing = value; }
183                 }
184
185                 public int MaxTransactedBatchSize {
186                         get { return max_tx_batch_size; }
187                         set { max_tx_batch_size = value; }
188                 }
189
190                 public override ServiceHostBase Host {
191                         get { return host; }
192                 }
193
194                 public override IChannelListener Listener {
195                         get { return listener; }
196                 }
197
198                 public MessageVersion MessageVersion {
199                         get { return message_version; }
200                         set { message_version = value; }
201                 }
202
203                 public bool ReceiveSynchronously {
204                         get { return receive_sync; }
205                         set {
206                                 ThrowIfDisposedOrImmutable ();
207                                 receive_sync = value; 
208                         }
209                 }
210
211                 public bool IncludeExceptionDetailInFaults {
212                         get { return include_exception_detail_in_faults; }
213                         set { include_exception_detail_in_faults = value; }
214                 }
215
216                 public ServiceThrottle ServiceThrottle {
217                         get { return throttle; }
218                         set { throttle = value; }
219                 }
220
221                 public IsolationLevel TransactionIsolationLevel {
222                         get { return tx_isolation_level; }
223                         set { tx_isolation_level = value; }
224                 }
225
226                 public TimeSpan TransactionTimeout {
227                         get { return tx_timeout; }
228                         set { tx_timeout = value; }
229                 }
230
231                 protected internal override void Attach (ServiceHostBase host)
232                 {
233                         this.host = host;
234                 }
235
236                 public override void CloseInput ()
237                 {
238                         if (loop_manager != null)
239                                 loop_manager.CloseInput ();
240                 }
241
242                 protected internal override void Detach (ServiceHostBase host)
243                 {                       
244                         this.host = null;                       
245                 }
246
247                 protected override void OnAbort ()
248                 {
249                         if (loop_manager != null)
250                                 loop_manager.Stop (TimeSpan.FromTicks (1));
251                 }
252
253                 Action<TimeSpan> open_delegate;
254                 Action<TimeSpan> close_delegate;
255
256                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
257                         AsyncCallback callback, object state)
258                 {
259                         if (close_delegate == null)
260                                 close_delegate = new Action<TimeSpan> (OnClose);
261                         return close_delegate.BeginInvoke (timeout, callback, state);
262                 }
263
264                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
265                         AsyncCallback callback, object state)
266                 {
267                         if (open_delegate == null)
268                                 open_delegate = new Action<TimeSpan> (OnOpen);
269                         return open_delegate.BeginInvoke (timeout, callback, state);
270                 }
271
272                 protected override void OnClose (TimeSpan timeout)
273                 {
274                         if (loop_manager != null)
275                                 loop_manager.Stop (timeout);
276                 }
277
278                 protected override void OnClosed ()
279                 {
280                         if (host != null)
281                                 host.ChannelDispatchers.Remove (this);
282                         base.OnClosed ();
283                 }
284
285                 protected override void OnEndClose (IAsyncResult result)
286                 {
287                         close_delegate.EndInvoke (result);
288                 }
289
290                 protected override void OnEndOpen (IAsyncResult result)
291                 {
292                         open_delegate.EndInvoke (result);
293                 }
294
295                 protected override void OnOpen (TimeSpan timeout)
296                 {
297                         if (Host == null || MessageVersion == null)
298                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
299
300                         loop_manager.Setup (timeout);
301                 }
302
303                 protected override void OnOpening ()
304                 {
305                         base.OnOpening ();
306                         loop_manager = new ListenerLoopManager (this);
307                 }
308
309                 protected override void OnOpened ()
310                 {
311                         base.OnOpened ();
312                         StartLoop ();
313                 }
314
315                 void StartLoop ()
316                 {
317                         // FIXME: not sure if it should be filled here.
318                         if (ServiceThrottle == null)
319                                 ServiceThrottle = new ServiceThrottle ();
320
321                         loop_manager.Start ();
322                 }
323         }
324
325                 // isolated from ChannelDispatcher
326                 class ListenerLoopManager
327                 {
328                         ChannelDispatcher owner;
329                         AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
330                         AutoResetEvent creator_handle = new AutoResetEvent (false);
331                         ManualResetEvent stop_handle = new ManualResetEvent (false);
332                         bool loop;
333                         Thread loop_thread;
334                         DateTime close_started;
335                         TimeSpan close_timeout;
336                         Func<IAsyncResult> channel_acceptor;
337                         List<IChannel> channels = new List<IChannel> ();
338                         AddressFilterMode address_filter_mode;
339
340                         public ListenerLoopManager (ChannelDispatcher owner)
341                         {
342                                 this.owner = owner;
343                                 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
344                                 if (sba != null)
345                                         address_filter_mode = sba.AddressFilterMode;
346                         }
347
348                         public void Setup (TimeSpan openTimeout)
349                         {
350                                 if (owner.Listener.State != CommunicationState.Opened)
351                                         owner.Listener.Open (openTimeout);
352
353                                 // It is tested at Open(), but strangely it is not instantiated at this point.
354                                 foreach (var ed in owner.Endpoints)
355                                         if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
356                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
357                                 SetupChannelAcceptor ();
358                         }
359
360                         public void Start ()
361                         {
362                                 if (loop_thread == null)
363                                         loop_thread = new Thread (new ThreadStart (Loop));
364                                 loop_thread.Start ();
365                         }
366
367                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
368                         {
369                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
370                                 if (r == null)
371                                         return null;
372                                 AsyncCallback callback = delegate (IAsyncResult result) {
373                                         try {
374                                                 ChannelAccepted (r.EndAcceptChannel (result));
375                                         } catch (Exception ex) {
376                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
377                                                 Console.WriteLine (ex);
378                                                 creator_handle.Set ();
379                                         }
380                                 };
381                                 return delegate {
382                                         try {
383                                                 return r.BeginAcceptChannel (callback, null);
384                                         } catch (Exception ex) {
385                                                 Console.WriteLine ("Exception during accepting channel.");
386                                                 Console.WriteLine (ex);
387                                                 throw;
388                                         }
389                                 };
390                         }
391
392                         void SetupChannelAcceptor ()
393                         {
394                                 var l = owner.Listener;
395                                 channel_acceptor =
396                                         CreateAcceptor<IReplyChannel> (l) ??
397                                         CreateAcceptor<IReplySessionChannel> (l) ??
398                                         CreateAcceptor<IInputChannel> (l) ??
399                                         CreateAcceptor<IInputSessionChannel> (l) ??
400                                         CreateAcceptor<IDuplexChannel> (l) ??
401                                         CreateAcceptor<IDuplexSessionChannel> (l);
402                                 if (channel_acceptor == null)
403                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
404                         }
405
406                         public void Stop (TimeSpan timeout)
407                         {
408                                 if (loop_thread == null)
409                                         return;
410
411                                 close_started = DateTime.Now;
412                                 close_timeout = timeout;
413                                 loop = false;
414                                 creator_handle.Set ();
415                                 throttle_wait_handle.Set (); // break primary loop
416                                 if (stop_handle != null) {
417                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
418                                         stop_handle.Close ();
419                                         stop_handle = null;
420                                 }
421                                 if (owner.Listener.State != CommunicationState.Closed) {
422                                         // FIXME: log it
423                                         Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
424                                         owner.Listener.Abort ();
425                                 }
426                                 if (loop_thread != null && loop_thread.IsAlive)
427                                         loop_thread.Abort ();
428                                 loop_thread = null;
429                         }
430
431                         public void CloseInput ()
432                         {
433                                 foreach (var ch in channels.ToArray ()) {
434                                         if (ch.State == CommunicationState.Closed)
435                                                 channels.Remove (ch); // zonbie, if exists
436                                         else {
437                                                 try {
438                                                         ch.Close (close_timeout - (DateTime.Now - close_started));
439                                                 } catch (Exception ex) {
440                                                         // FIXME: log it.
441                                                         Console.WriteLine (ex);
442                                                         ch.Abort ();
443                                                 }
444                                         }
445                                 }
446                         }
447
448                         void Loop ()
449                         {
450                                 try {
451                                         LoopCore ();
452                                 } catch (Exception ex) {
453                                         // FIXME: log it
454                                         Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
455                                         Console.WriteLine (ex);
456                                 } finally {
457                                         if (stop_handle != null)
458                                                 stop_handle.Set ();
459                                 }
460                         }
461
462                         void LoopCore ()
463                         {
464                                 loop = true;
465
466                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
467                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
468
469                                 while (loop) {
470                                         // FIXME: enable throttling and allow more than one connection to process at a time.
471                                         while (loop && channels.Count < 1) {
472 //                                      while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
473                                                 channel_acceptor ();
474                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
475                                         }
476                                         if (!loop)
477                                                 break;
478                                         throttle_wait_handle.WaitOne (); // released by IChannel.Close()
479                                 }
480                                 try {
481                                         owner.Listener.Close ();
482                                 } finally {
483                                         // make sure to close both listener and channels.
484                                         owner.CloseInput ();
485                                 }
486                         }
487
488                         void ChannelAccepted (IChannel ch)
489                         {
490                         try {
491                                 if (ch == null) // could happen when it was aborted
492                                         return;
493                                 if (!loop) {
494                                         var dis = ch as IDisposable;
495                                         if (dis != null)
496                                                 dis.Dispose ();
497                                         return;
498                                 }
499
500                                 lock (channels)
501                                         channels.Add (ch);
502                                 ch.Opened += delegate {
503                                         ch.Faulted += delegate {
504                                                 lock (channels)
505                                                         if (channels.Contains (ch))
506                                                                 channels.Remove (ch);
507                                                 throttle_wait_handle.Set (); // release loop wait lock.
508                                                 };
509                                         ch.Closed += delegate {
510                                                 lock (channels)
511                                                         if (channels.Contains (ch))
512                                                                 channels.Remove (ch);
513                                                 throttle_wait_handle.Set (); // release loop wait lock.
514                                                 };
515                                         };
516                                 ch.Open ();
517                         } finally {
518                                 creator_handle.Set ();
519                         }
520
521                                 ProcessRequestOrInput (ch);
522                         }
523
524                         void ProcessRequestOrInput (IChannel ch)
525                         {
526                                 var reply = ch as IReplyChannel;
527                                 var input = ch as IInputChannel;
528
529                                 if (reply != null) {
530                                         if (owner.ReceiveSynchronously) {
531                                                 RequestContext rc;
532                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
533                                                         ProcessRequest (reply, rc);
534                                         } else {
535                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
536                                         }
537                                 } else if (input != null) {
538                                         if (owner.ReceiveSynchronously) {
539                                                 Message msg;
540                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
541                                                         ProcessInput (input, msg);
542                                         } else {
543                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
544                                         }
545                                 }
546                         }
547
548                         void TryReceiveRequestDone (IAsyncResult result)
549                         {
550                                 RequestContext rc;
551                                 var reply = (IReplyChannel) result.AsyncState;
552                                 if (reply.EndTryReceiveRequest (result, out rc))
553                                         ProcessRequest (reply, rc);
554                                 else
555                                         reply.Close ();
556                         }
557
558                         void TryReceiveDone (IAsyncResult result)
559                         {
560                                 Message msg;
561                                 var input = (IInputChannel) result.AsyncState;
562                                 if (input.EndTryReceive (result, out msg))
563                                         ProcessInput (input, msg);
564                                 else
565                                         input.Close ();
566                         }
567
568                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
569                         {
570                                 var req = rc.RequestMessage;
571                                 try {
572                                         var ed = FindEndpointDispatcher (req);
573                                         new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
574                                 } catch (Exception ex) {
575                                         // FIXME: log it.
576                                         Console.WriteLine (ex);
577
578                                         var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
579                                         Message res;
580                                         if (!conv.TryCreateFaultMessage (ex, out res))
581                                                 res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
582                                         rc.Reply (res);
583                                 } finally {
584                                         if (rc != null)
585                                                 rc.Close ();
586                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
587                                         if (loop && reply.State != CommunicationState.Closed)
588                                                 ProcessRequestOrInput (reply);
589                                 }
590                         }
591
592                         void ProcessInput (IInputChannel input, Message message)
593                         {
594                                 try {
595                                         EndpointDispatcher candidate = null;
596                                         candidate = FindEndpointDispatcher (message);
597                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
598                                                 ProcessInput (message);
599                                 }
600                                 catch (Exception ex) {
601                                         // FIXME: log it.
602                                         Console.WriteLine (ex);
603                                 } finally {
604                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
605                                         if (loop && input.State != CommunicationState.Closed)
606                                                 ProcessRequestOrInput (input);
607                                 }
608                         }
609
610                         EndpointDispatcher FindEndpointDispatcher (Message message) {
611                                 EndpointDispatcher candidate = null;
612                                 bool hasEndpointMatch = false;
613                                 foreach (var endpoint in owner.Endpoints) {
614                                         if (endpoint.AddressFilter.Match (message)) {
615                                                 hasEndpointMatch = true;
616                                                 if (!endpoint.ContractFilter.Match (message))
617                                                         continue;
618                                                 var newdis = endpoint;
619                                                 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
620                                                         candidate = newdis;
621                                                 else if (candidate.FilterPriority == newdis.FilterPriority)
622                                                         throw new MultipleFilterMatchesException ();
623                                         }
624                                 }
625                                 if (candidate == null && !hasEndpointMatch) {
626                                         if (owner.Host != null)
627                                                 owner.Host.OnUnknownMessageReceived (message);
628                                         // we have to return a fault to the client anyways...
629                                         throw new EndpointNotFoundException ();
630                                 }
631                                 else if (candidate == null)
632                                         // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
633                                         throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));
634
635                                 return candidate;
636                         }
637                 }
638 }