2010-05-21 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         public class ChannelDispatcher : ChannelDispatcherBase
41         {
42                 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
43                 {
44                         public EndpointDispatcherCollection (ChannelDispatcher owner)
45                         {
46                                 this.owner = owner;
47                         }
48
49                         ChannelDispatcher owner;
50
51                         protected override void ClearItems ()
52                         {
53                                 foreach (var ed in this)
54                                         ed.ChannelDispatcher = null;
55                                 base.ClearItems ();
56                         }
57
58                         protected override void InsertItem (int index, EndpointDispatcher item)
59                         {
60                                 item.ChannelDispatcher = owner;
61                                 base.InsertItem (index, item);
62                         }
63
64                         protected override void RemoveItem (int index)
65                         {
66                                 if (index < Count)
67                                         this [index].ChannelDispatcher = null;
68                                 base.RemoveItem (index);
69                         }
70
71                         protected override void SetItem (int index, EndpointDispatcher item)
72                         {
73                                 item.ChannelDispatcher = owner;
74                                 base.SetItem (index, item);
75                         }
76                 }
77
78                 ServiceHostBase host;
79
80                 string binding_name;            
81                 Collection<IErrorHandler> error_handlers
82                         = new Collection<IErrorHandler> ();
83                 IChannelListener listener;
84                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
85                 MessageVersion message_version;
86                 bool receive_sync, include_exception_detail_in_faults,
87                         manual_addressing, is_tx_receive;
88                 int max_tx_batch_size;
89                 SynchronizedCollection<IChannelInitializer> initializers
90                         = new SynchronizedCollection<IChannelInitializer> ();
91                 IsolationLevel tx_isolation_level;
92                 TimeSpan tx_timeout;
93                 ServiceThrottle throttle;
94
95                 Guid identifier = Guid.NewGuid ();
96                 ManualResetEvent async_event = new ManualResetEvent (false);
97
98                 ListenerLoopManager loop_manager;
99                 SynchronizedCollection<EndpointDispatcher> endpoints;
100
101                 [MonoTODO ("get binding info from config")]
102                 public ChannelDispatcher (IChannelListener listener)
103                         : this (listener, null)
104                 {
105                 }
106
107                 public ChannelDispatcher (
108                         IChannelListener listener, string bindingName)
109                         : this (listener, bindingName, null)
110                 {
111                 }
112
113                 public ChannelDispatcher (
114                         IChannelListener listener, string bindingName,
115                         IDefaultCommunicationTimeouts timeouts)
116                 {
117                         if (listener == null)
118                                 throw new ArgumentNullException ("listener");
119                         Init (listener, bindingName, timeouts);
120                 }
121
122                 private void Init (IChannelListener listener, string bindingName,
123                         IDefaultCommunicationTimeouts timeouts)
124                 {
125                         this.listener = listener;
126                         this.binding_name = bindingName;
127                         // IChannelListener is often a ChannelListenerBase
128                         // which implements IDefaultCommunicationTimeouts.
129                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
130                         endpoints = new EndpointDispatcherCollection (this);
131                 }
132
133                 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
134                 {
135                         this.MessageVersion = se.Binding.MessageVersion;
136                         if (this.MessageVersion == null)
137                                 this.MessageVersion = MessageVersion.Default;
138
139                         //Attach one EndpointDispacher to the ChannelDispatcher
140                         EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
141                         this.Endpoints.Add (ed);
142                         ed.InitializeServiceEndpoint (false, serviceType, se);
143                         return ed;
144                 }
145
146                 public string BindingName {
147                         get { return binding_name; }
148                 }
149
150                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
151                         get { return initializers; }
152                 }
153
154                 protected internal override TimeSpan DefaultCloseTimeout {
155                         get { return timeouts.CloseTimeout; }
156                 }
157
158                 protected internal override TimeSpan DefaultOpenTimeout {
159                         get { return timeouts.OpenTimeout; }
160                 }
161
162                 public Collection<IErrorHandler> ErrorHandlers {
163                         get { return error_handlers; }
164                 }
165
166                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
167                         get { return endpoints; }
168                 }
169
170                 [MonoTODO]
171                 public bool IsTransactedAccept {
172                         get { throw new NotImplementedException (); }
173                 }
174
175                 public bool IsTransactedReceive {
176                         get { return is_tx_receive; }
177                         set { is_tx_receive = value; }
178                 }
179
180                 public bool ManualAddressing {
181                         get { return manual_addressing; }
182                         set { manual_addressing = value; }
183                 }
184
185                 public int MaxTransactedBatchSize {
186                         get { return max_tx_batch_size; }
187                         set { max_tx_batch_size = value; }
188                 }
189
190                 public override ServiceHostBase Host {
191                         get { return host; }
192                 }
193
194                 public override IChannelListener Listener {
195                         get { return listener; }
196                 }
197
198                 public MessageVersion MessageVersion {
199                         get { return message_version; }
200                         set { message_version = value; }
201                 }
202
203                 public bool ReceiveSynchronously {
204                         get { return receive_sync; }
205                         set {
206                                 ThrowIfDisposedOrImmutable ();
207                                 receive_sync = value; 
208                         }
209                 }
210
211                 public bool IncludeExceptionDetailInFaults {
212                         get { return include_exception_detail_in_faults; }
213                         set { include_exception_detail_in_faults = value; }
214                 }
215
216                 public ServiceThrottle ServiceThrottle {
217                         get { return throttle; }
218                         set { throttle = value; }
219                 }
220
221                 public IsolationLevel TransactionIsolationLevel {
222                         get { return tx_isolation_level; }
223                         set { tx_isolation_level = value; }
224                 }
225
226                 public TimeSpan TransactionTimeout {
227                         get { return tx_timeout; }
228                         set { tx_timeout = value; }
229                 }
230
231                 protected internal override void Attach (ServiceHostBase host)
232                 {
233                         this.host = host;
234                         var bl = listener as IChannelDispatcherBoundListener;
235                         if (bl != null)
236                                 bl.ChannelDispatcher = this;
237                 }
238
239                 public override void CloseInput ()
240                 {
241                         if (loop_manager != null)
242                                 loop_manager.CloseInput ();
243                 }
244
245                 protected internal override void Detach (ServiceHostBase host)
246                 {                       
247                         this.host = null;                       
248                 }
249
250                 protected override void OnAbort ()
251                 {
252                         if (loop_manager != null)
253                                 loop_manager.Stop (TimeSpan.FromTicks (1));
254                 }
255
256                 Action<TimeSpan> open_delegate;
257                 Action<TimeSpan> close_delegate;
258
259                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
260                         AsyncCallback callback, object state)
261                 {
262                         if (close_delegate == null)
263                                 close_delegate = new Action<TimeSpan> (OnClose);
264                         return close_delegate.BeginInvoke (timeout, callback, state);
265                 }
266
267                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
268                         AsyncCallback callback, object state)
269                 {
270                         if (open_delegate == null)
271                                 open_delegate = new Action<TimeSpan> (OnOpen);
272                         return open_delegate.BeginInvoke (timeout, callback, state);
273                 }
274
275                 protected override void OnClose (TimeSpan timeout)
276                 {
277                         if (loop_manager != null)
278                                 loop_manager.Stop (timeout);
279                 }
280
281                 protected override void OnClosed ()
282                 {
283                         if (host != null)
284                                 host.ChannelDispatchers.Remove (this);
285                         base.OnClosed ();
286                 }
287
288                 protected override void OnEndClose (IAsyncResult result)
289                 {
290                         close_delegate.EndInvoke (result);
291                 }
292
293                 protected override void OnEndOpen (IAsyncResult result)
294                 {
295                         open_delegate.EndInvoke (result);
296                 }
297
298                 protected override void OnOpen (TimeSpan timeout)
299                 {
300                         if (Host == null || MessageVersion == null)
301                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
302
303                         loop_manager.Setup (timeout);
304                 }
305
306                 protected override void OnOpening ()
307                 {
308                         base.OnOpening ();
309                         loop_manager = new ListenerLoopManager (this);
310                 }
311
312                 protected override void OnOpened ()
313                 {
314                         base.OnOpened ();
315                         StartLoop ();
316                 }
317
318                 void StartLoop ()
319                 {
320                         // FIXME: not sure if it should be filled here.
321                         if (ServiceThrottle == null)
322                                 ServiceThrottle = new ServiceThrottle ();
323
324                         loop_manager.Start ();
325                 }
326         }
327
328                 // isolated from ChannelDispatcher
329                 class ListenerLoopManager
330                 {
331                         ChannelDispatcher owner;
332                         AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
333                         AutoResetEvent creator_handle = new AutoResetEvent (false);
334                         ManualResetEvent stop_handle = new ManualResetEvent (false);
335                         bool loop;
336                         Thread loop_thread;
337                         DateTime close_started;
338                         TimeSpan close_timeout;
339                         Func<IAsyncResult> channel_acceptor;
340                         List<IChannel> channels = new List<IChannel> ();
341                         AddressFilterMode address_filter_mode;
342
343                         public ListenerLoopManager (ChannelDispatcher owner)
344                         {
345                                 this.owner = owner;
346                                 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
347                                 if (sba != null)
348                                         address_filter_mode = sba.AddressFilterMode;
349                         }
350
351                         public void Setup (TimeSpan openTimeout)
352                         {
353                                 if (owner.Listener.State != CommunicationState.Opened)
354                                         owner.Listener.Open (openTimeout);
355
356                                 // It is tested at Open(), but strangely it is not instantiated at this point.
357                                 foreach (var ed in owner.Endpoints)
358                                         if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
359                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
360                                 SetupChannelAcceptor ();
361                         }
362
363                         public void Start ()
364                         {
365                                 if (loop_thread == null)
366                                         loop_thread = new Thread (new ThreadStart (Loop));
367                                 loop_thread.Start ();
368                         }
369
370                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
371                         {
372                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
373                                 if (r == null)
374                                         return null;
375                                 AsyncCallback callback = delegate (IAsyncResult result) {
376                                         try {
377                                                 ChannelAccepted (r.EndAcceptChannel (result));
378                                         } catch (Exception ex) {
379                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
380                                                 Console.WriteLine (ex);
381                                                 creator_handle.Set ();
382                                         }
383                                 };
384                                 return delegate {
385                                         try {
386                                                 return r.BeginAcceptChannel (callback, null);
387                                         } catch (Exception ex) {
388                                                 Console.WriteLine ("Exception during accepting channel.");
389                                                 Console.WriteLine (ex);
390                                                 throw;
391                                         }
392                                 };
393                         }
394
395                         void SetupChannelAcceptor ()
396                         {
397                                 var l = owner.Listener;
398                                 channel_acceptor =
399                                         CreateAcceptor<IReplyChannel> (l) ??
400                                         CreateAcceptor<IReplySessionChannel> (l) ??
401                                         CreateAcceptor<IInputChannel> (l) ??
402                                         CreateAcceptor<IInputSessionChannel> (l) ??
403                                         CreateAcceptor<IDuplexChannel> (l) ??
404                                         CreateAcceptor<IDuplexSessionChannel> (l);
405                                 if (channel_acceptor == null)
406                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
407                         }
408
409                         public void Stop (TimeSpan timeout)
410                         {
411                                 if (loop_thread == null)
412                                         return;
413
414                                 close_started = DateTime.Now;
415                                 close_timeout = timeout;
416                                 loop = false;
417                                 creator_handle.Set ();
418                                 throttle_wait_handle.Set (); // break primary loop
419                                 if (stop_handle != null) {
420                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
421                                         stop_handle.Close ();
422                                         stop_handle = null;
423                                 }
424                                 if (owner.Listener.State != CommunicationState.Closed) {
425                                         // FIXME: log it
426                                         Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
427                                         owner.Listener.Abort ();
428                                 }
429                                 if (loop_thread != null && loop_thread.IsAlive)
430                                         loop_thread.Abort ();
431                                 loop_thread = null;
432                         }
433
434                         public void CloseInput ()
435                         {
436                                 foreach (var ch in channels.ToArray ()) {
437                                         if (ch.State == CommunicationState.Closed)
438                                                 channels.Remove (ch); // zonbie, if exists
439                                         else {
440                                                 try {
441                                                         ch.Close (close_timeout - (DateTime.Now - close_started));
442                                                 } catch (Exception ex) {
443                                                         // FIXME: log it.
444                                                         Console.WriteLine (ex);
445                                                         ch.Abort ();
446                                                 }
447                                         }
448                                 }
449                         }
450
451                         void Loop ()
452                         {
453                                 try {
454                                         LoopCore ();
455                                 } catch (Exception ex) {
456                                         // FIXME: log it
457                                         Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
458                                         Console.WriteLine (ex);
459                                 } finally {
460                                         if (stop_handle != null)
461                                                 stop_handle.Set ();
462                                 }
463                         }
464
465                         void LoopCore ()
466                         {
467                                 loop = true;
468
469                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
470                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
471
472                                 while (loop) {
473                                         // FIXME: take MaxConcurrentCalls into consideration too.
474                                         while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
475                                                 channel_acceptor ();
476                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
477                                         }
478                                         if (!loop)
479                                                 break;
480                                         throttle_wait_handle.WaitOne (); // released by IChannel.Close()
481                                 }
482                                 try {
483                                         owner.Listener.Close ();
484                                 } finally {
485                                         // make sure to close both listener and channels.
486                                         owner.CloseInput ();
487                                 }
488                         }
489
490                         void ChannelAccepted (IChannel ch)
491                         {
492                         try {
493                                 if (ch == null) // could happen when it was aborted
494                                         return;
495                                 if (!loop) {
496                                         var dis = ch as IDisposable;
497                                         if (dis != null)
498                                                 dis.Dispose ();
499                                         return;
500                                 }
501
502                                 lock (channels)
503                                         channels.Add (ch);
504                                 ch.Opened += delegate {
505                                         ch.Faulted += delegate {
506                                                 lock (channels)
507                                                         if (channels.Contains (ch))
508                                                                 channels.Remove (ch);
509                                                 throttle_wait_handle.Set (); // release loop wait lock.
510                                                 };
511                                         ch.Closed += delegate {
512                                                 lock (channels)
513                                                         if (channels.Contains (ch))
514                                                                 channels.Remove (ch);
515                                                 throttle_wait_handle.Set (); // release loop wait lock.
516                                                 };
517                                         };
518                                 ch.Open ();
519                         } finally {
520                                 creator_handle.Set ();
521                         }
522
523                                 ProcessRequestOrInput (ch);
524                         }
525
526                         void ProcessRequestOrInput (IChannel ch)
527                         {
528                                 var reply = ch as IReplyChannel;
529                                 var input = ch as IInputChannel;
530
531                                 if (reply != null) {
532                                         if (owner.ReceiveSynchronously) {
533                                                 RequestContext rc;
534                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
535                                                         ProcessRequest (reply, rc);
536                                         } else {
537                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
538                                         }
539                                 } else if (input != null) {
540                                         if (owner.ReceiveSynchronously) {
541                                                 Message msg;
542                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
543                                                         ProcessInput (input, msg);
544                                         } else {
545                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
546                                         }
547                                 }
548                         }
549
550                         void TryReceiveRequestDone (IAsyncResult result)
551                         {
552                                 RequestContext rc;
553                                 var reply = (IReplyChannel) result.AsyncState;
554                                 if (reply.EndTryReceiveRequest (result, out rc))
555                                         ProcessRequest (reply, rc);
556                                 else
557                                         reply.Close ();
558                         }
559
560                         void TryReceiveDone (IAsyncResult result)
561                         {
562                                 Message msg;
563                                 var input = (IInputChannel) result.AsyncState;
564                                 if (input.EndTryReceive (result, out msg))
565                                         ProcessInput (input, msg);
566                                 else
567                                         input.Close ();
568                         }
569
570                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
571                         {
572                                 var req = rc.RequestMessage;
573                                 try {
574                                         var ed = FindEndpointDispatcher (req);
575                                         new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
576                                 } catch (Exception ex) {
577                                         // FIXME: log it.
578                                         Console.WriteLine (ex);
579
580                                         var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
581                                         Message res;
582                                         if (!conv.TryCreateFaultMessage (ex, out res))
583                                                 res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
584                                         rc.Reply (res);
585                                 } finally {
586                                         if (rc != null)
587                                                 rc.Close ();
588                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
589                                         if (loop && reply.State != CommunicationState.Closed)
590                                                 ProcessRequestOrInput (reply);
591                                 }
592                         }
593
594                         void ProcessInput (IInputChannel input, Message message)
595                         {
596                                 try {
597                                         EndpointDispatcher candidate = null;
598                                         candidate = FindEndpointDispatcher (message);
599                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
600                                                 ProcessInput (message);
601                                 }
602                                 catch (Exception ex) {
603                                         // FIXME: log it.
604                                         Console.WriteLine (ex);
605                                 } finally {
606                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
607                                         if (loop && input.State != CommunicationState.Closed)
608                                                 ProcessRequestOrInput (input);
609                                 }
610                         }
611
612                         EndpointDispatcher FindEndpointDispatcher (Message message) {
613                                 EndpointDispatcher candidate = null;
614                                 bool hasEndpointMatch = false;
615                                 foreach (var endpoint in owner.Endpoints) {
616                                         if (endpoint.AddressFilter.Match (message)) {
617                                                 hasEndpointMatch = true;
618                                                 if (!endpoint.ContractFilter.Match (message))
619                                                         continue;
620                                                 var newdis = endpoint;
621                                                 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
622                                                         candidate = newdis;
623                                                 else if (candidate.FilterPriority == newdis.FilterPriority)
624                                                         throw new MultipleFilterMatchesException ();
625                                         }
626                                 }
627                                 if (candidate == null && !hasEndpointMatch) {
628                                         if (owner.Host != null)
629                                                 owner.Host.OnUnknownMessageReceived (message);
630                                         // we have to return a fault to the client anyways...
631                                         throw new EndpointNotFoundException ();
632                                 }
633                                 else if (candidate == null)
634                                         // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
635                                         throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));
636
637                                 return candidate;
638                         }
639                 }
640 }