2010-07-05 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         internal interface IChannelDispatcherBoundListener
41         {
42                 ChannelDispatcher ChannelDispatcher { get; set; }
43         }
44
45         public class ChannelDispatcher : ChannelDispatcherBase
46         {
47                 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
48                 {
49                         public EndpointDispatcherCollection (ChannelDispatcher owner)
50                         {
51                                 this.owner = owner;
52                         }
53
54                         ChannelDispatcher owner;
55
56                         protected override void ClearItems ()
57                         {
58                                 foreach (var ed in this)
59                                         ed.ChannelDispatcher = null;
60                                 base.ClearItems ();
61                         }
62
63                         protected override void InsertItem (int index, EndpointDispatcher item)
64                         {
65                                 item.ChannelDispatcher = owner;
66                                 base.InsertItem (index, item);
67                         }
68
69                         protected override void RemoveItem (int index)
70                         {
71                                 if (index < Count)
72                                         this [index].ChannelDispatcher = null;
73                                 base.RemoveItem (index);
74                         }
75
76                         protected override void SetItem (int index, EndpointDispatcher item)
77                         {
78                                 item.ChannelDispatcher = owner;
79                                 base.SetItem (index, item);
80                         }
81                 }
82
83                 ServiceHostBase host;
84
85                 string binding_name;            
86                 Collection<IErrorHandler> error_handlers
87                         = new Collection<IErrorHandler> ();
88                 IChannelListener listener;
89                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
90                 MessageVersion message_version;
91                 bool receive_sync, include_exception_detail_in_faults,
92                         manual_addressing, is_tx_receive;
93                 int max_tx_batch_size;
94                 SynchronizedCollection<IChannelInitializer> initializers
95                         = new SynchronizedCollection<IChannelInitializer> ();
96                 IsolationLevel tx_isolation_level;
97                 TimeSpan tx_timeout;
98                 ServiceThrottle throttle;
99
100                 Guid identifier = Guid.NewGuid ();
101                 ManualResetEvent async_event = new ManualResetEvent (false);
102
103                 ListenerLoopManager loop_manager;
104                 SynchronizedCollection<EndpointDispatcher> endpoints;
105
106                 [MonoTODO ("get binding info from config")]
107                 public ChannelDispatcher (IChannelListener listener)
108                         : this (listener, null)
109                 {
110                 }
111
112                 public ChannelDispatcher (
113                         IChannelListener listener, string bindingName)
114                         : this (listener, bindingName, null)
115                 {
116                 }
117
118                 public ChannelDispatcher (
119                         IChannelListener listener, string bindingName,
120                         IDefaultCommunicationTimeouts timeouts)
121                 {
122                         if (listener == null)
123                                 throw new ArgumentNullException ("listener");
124                         Init (listener, bindingName, timeouts);
125                 }
126
127                 private void Init (IChannelListener listener, string bindingName,
128                         IDefaultCommunicationTimeouts timeouts)
129                 {
130                         this.listener = listener;
131                         this.binding_name = bindingName;
132                         // IChannelListener is often a ChannelListenerBase
133                         // which implements IDefaultCommunicationTimeouts.
134                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
135                         endpoints = new EndpointDispatcherCollection (this);
136                 }
137
138                 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
139                 {
140                         //Attach one EndpointDispacher to the ChannelDispatcher
141                         EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
142                         this.Endpoints.Add (ed);
143                         ed.InitializeServiceEndpoint (false, serviceType, se);
144                         return ed;
145                 }
146
147                 public string BindingName {
148                         get { return binding_name; }
149                 }
150
151                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
152                         get { return initializers; }
153                 }
154
155                 protected internal override TimeSpan DefaultCloseTimeout {
156                         get { return timeouts.CloseTimeout; }
157                 }
158
159                 protected internal override TimeSpan DefaultOpenTimeout {
160                         get { return timeouts.OpenTimeout; }
161                 }
162
163                 public Collection<IErrorHandler> ErrorHandlers {
164                         get { return error_handlers; }
165                 }
166
167                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
168                         get { return endpoints; }
169                 }
170
171                 [MonoTODO]
172                 public bool IsTransactedAccept {
173                         get { throw new NotImplementedException (); }
174                 }
175
176                 public bool IsTransactedReceive {
177                         get { return is_tx_receive; }
178                         set { is_tx_receive = value; }
179                 }
180
181                 public bool ManualAddressing {
182                         get { return manual_addressing; }
183                         set { manual_addressing = value; }
184                 }
185
186                 public int MaxTransactedBatchSize {
187                         get { return max_tx_batch_size; }
188                         set { max_tx_batch_size = value; }
189                 }
190
191                 public override ServiceHostBase Host {
192                         get { return host; }
193                 }
194
195                 public override IChannelListener Listener {
196                         get { return listener; }
197                 }
198
199                 public MessageVersion MessageVersion {
200                         get { return message_version; }
201                         set { message_version = value; }
202                 }
203
204                 public bool ReceiveSynchronously {
205                         get { return receive_sync; }
206                         set {
207                                 ThrowIfDisposedOrImmutable ();
208                                 receive_sync = value; 
209                         }
210                 }
211
212                 public bool IncludeExceptionDetailInFaults {
213                         get { return include_exception_detail_in_faults; }
214                         set { include_exception_detail_in_faults = value; }
215                 }
216
217                 public ServiceThrottle ServiceThrottle {
218                         get { return throttle; }
219                         set { throttle = value; }
220                 }
221
222                 public IsolationLevel TransactionIsolationLevel {
223                         get { return tx_isolation_level; }
224                         set { tx_isolation_level = value; }
225                 }
226
227                 public TimeSpan TransactionTimeout {
228                         get { return tx_timeout; }
229                         set { tx_timeout = value; }
230                 }
231
232                 protected internal override void Attach (ServiceHostBase host)
233                 {
234                         this.host = host;
235                         var bl = listener as IChannelDispatcherBoundListener;
236                         if (bl != null)
237                                 bl.ChannelDispatcher = this;
238                 }
239
240                 public override void CloseInput ()
241                 {
242                         if (loop_manager != null)
243                                 loop_manager.CloseInput ();
244                 }
245
246                 protected internal override void Detach (ServiceHostBase host)
247                 {                       
248                         this.host = null;                       
249                 }
250
251                 protected override void OnAbort ()
252                 {
253                         if (loop_manager != null)
254                                 loop_manager.Stop (TimeSpan.FromTicks (1));
255                 }
256
257                 Action<TimeSpan> open_delegate;
258                 Action<TimeSpan> close_delegate;
259
260                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
261                         AsyncCallback callback, object state)
262                 {
263                         if (close_delegate == null)
264                                 close_delegate = new Action<TimeSpan> (OnClose);
265                         return close_delegate.BeginInvoke (timeout, callback, state);
266                 }
267
268                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
269                         AsyncCallback callback, object state)
270                 {
271                         if (open_delegate == null)
272                                 open_delegate = new Action<TimeSpan> (OnOpen);
273                         return open_delegate.BeginInvoke (timeout, callback, state);
274                 }
275
276                 protected override void OnClose (TimeSpan timeout)
277                 {
278                         if (loop_manager != null)
279                                 loop_manager.Stop (timeout);
280                 }
281
282                 protected override void OnClosed ()
283                 {
284                         if (host != null)
285                                 host.ChannelDispatchers.Remove (this);
286                         base.OnClosed ();
287                 }
288
289                 protected override void OnEndClose (IAsyncResult result)
290                 {
291                         close_delegate.EndInvoke (result);
292                 }
293
294                 protected override void OnEndOpen (IAsyncResult result)
295                 {
296                         open_delegate.EndInvoke (result);
297                 }
298
299                 protected override void OnOpen (TimeSpan timeout)
300                 {
301                         if (Host == null || MessageVersion == null)
302                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
303
304                         loop_manager.Setup (timeout);
305                 }
306
307                 protected override void OnOpening ()
308                 {
309                         base.OnOpening ();
310                         loop_manager = new ListenerLoopManager (this);
311                 }
312
313                 protected override void OnOpened ()
314                 {
315                         base.OnOpened ();
316                         StartLoop ();
317                 }
318
319                 void StartLoop ()
320                 {
321                         // FIXME: not sure if it should be filled here.
322                         if (ServiceThrottle == null)
323                                 ServiceThrottle = new ServiceThrottle (this);
324
325                         loop_manager.Start ();
326                 }
327         }
328
329                 // isolated from ChannelDispatcher
330                 class ListenerLoopManager
331                 {
332                         ChannelDispatcher owner;
333                         AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
334                         AutoResetEvent creator_handle = new AutoResetEvent (false);
335                         ManualResetEvent stop_handle = new ManualResetEvent (false);
336                         bool loop;
337                         Thread loop_thread;
338                         DateTime close_started;
339                         TimeSpan close_timeout;
340                         Func<IAsyncResult> channel_acceptor;
341                         List<IChannel> channels = new List<IChannel> ();
342                         AddressFilterMode address_filter_mode;
343
344                         public ListenerLoopManager (ChannelDispatcher owner)
345                         {
346                                 this.owner = owner;
347                                 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
348                                 if (sba != null)
349                                         address_filter_mode = sba.AddressFilterMode;
350                         }
351
352                         public void Setup (TimeSpan openTimeout)
353                         {
354                                 if (owner.Listener.State != CommunicationState.Created)
355                                         throw new InvalidOperationException ("Tried to open the channel listener which is bound to ChannelDispatcher, but it is not at Created state");
356                                 owner.Listener.Open (openTimeout);
357
358                                 // It is tested at Open(), but strangely it is not instantiated at this point.
359                                 foreach (var ed in owner.Endpoints)
360                                         if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
361                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
362                                 SetupChannelAcceptor ();
363                         }
364
365                         public void Start ()
366                         {
367                                 if (loop_thread == null)
368                                         loop_thread = new Thread (new ThreadStart (Loop));
369                                 loop_thread.Start ();
370                         }
371
372                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
373                         {
374                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
375                                 if (r == null)
376                                         return null;
377                                 AsyncCallback callback = delegate (IAsyncResult result) {
378                                         try {
379                                                 ChannelAccepted (r.EndAcceptChannel (result));
380                                         } catch (Exception ex) {
381                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
382                                                 Console.WriteLine (ex);
383                                                 creator_handle.Set ();
384                                         }
385                                 };
386                                 return delegate {
387                                         try {
388                                                 return r.BeginAcceptChannel (callback, null);
389                                         } catch (Exception ex) {
390                                                 Console.WriteLine ("Exception during accepting channel.");
391                                                 Console.WriteLine (ex);
392                                                 throw;
393                                         }
394                                 };
395                         }
396
397                         void SetupChannelAcceptor ()
398                         {
399                                 var l = owner.Listener;
400                                 channel_acceptor =
401                                         CreateAcceptor<IReplyChannel> (l) ??
402                                         CreateAcceptor<IReplySessionChannel> (l) ??
403                                         CreateAcceptor<IInputChannel> (l) ??
404                                         CreateAcceptor<IInputSessionChannel> (l) ??
405                                         CreateAcceptor<IDuplexChannel> (l) ??
406                                         CreateAcceptor<IDuplexSessionChannel> (l);
407                                 if (channel_acceptor == null)
408                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
409                         }
410
411                         public void Stop (TimeSpan timeout)
412                         {
413                                 if (loop_thread == null)
414                                         return;
415
416                                 close_started = DateTime.Now;
417                                 close_timeout = timeout;
418                                 loop = false;
419                                 creator_handle.Set ();
420                                 throttle_wait_handle.Set (); // break primary loop
421                                 if (stop_handle != null) {
422                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
423                                         stop_handle.Close ();
424                                         stop_handle = null;
425                                 }
426                                 if (owner.Listener.State != CommunicationState.Closed) {
427                                         // FIXME: log it
428                                         Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
429                                         owner.Listener.Abort ();
430                                 }
431                                 if (loop_thread != null && loop_thread.IsAlive)
432                                         loop_thread.Abort ();
433                                 loop_thread = null;
434                         }
435
436                         public void CloseInput ()
437                         {
438                                 foreach (var ch in channels.ToArray ()) {
439                                         if (ch.State == CommunicationState.Closed)
440                                                 channels.Remove (ch); // zonbie, if exists
441                                         else {
442                                                 try {
443                                                         ch.Close (close_timeout - (DateTime.Now - close_started));
444                                                 } catch (Exception ex) {
445                                                         // FIXME: log it.
446                                                         Console.WriteLine (ex);
447                                                         ch.Abort ();
448                                                 }
449                                         }
450                                 }
451                         }
452
453                         void Loop ()
454                         {
455                                 try {
456                                         LoopCore ();
457                                 } catch (Exception ex) {
458                                         // FIXME: log it
459                                         Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
460                                         Console.WriteLine (ex);
461                                 } finally {
462                                         if (stop_handle != null)
463                                                 stop_handle.Set ();
464                                 }
465                         }
466
467                         void LoopCore ()
468                         {
469                                 loop = true;
470
471                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
472                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
473
474                                 while (loop) {
475                                         // FIXME: take MaxConcurrentCalls into consideration appropriately.
476                                         while (loop && channels.Count < Math.Min (owner.ServiceThrottle.MaxConcurrentSessions, owner.ServiceThrottle.MaxConcurrentCalls)) {
477                                                 // FIXME: this should not be required, but saves multi-ChannelDispatcher case (Throttling enabled) for HTTP standalone listener...
478                                                 Thread.Sleep (100);
479                                                 channel_acceptor ();
480                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
481                                         }
482                                         if (!loop)
483                                                 break;
484                                         throttle_wait_handle.WaitOne (); // released by IChannel.Close()
485                                 }
486                                 try {
487                                         owner.Listener.Close ();
488                                 } finally {
489                                         // make sure to close both listener and channels.
490                                         owner.CloseInput ();
491                                 }
492                         }
493
494                         void ChannelAccepted (IChannel ch)
495                         {
496                         try {
497                                 if (ch == null) // could happen when it was aborted
498                                         return;
499                                 if (!loop) {
500                                         var dis = ch as IDisposable;
501                                         if (dis != null)
502                                                 dis.Dispose ();
503                                         return;
504                                 }
505
506                                 lock (channels)
507                                         channels.Add (ch);
508                                 ch.Opened += delegate {
509                                         ch.Faulted += delegate {
510                                                 lock (channels)
511                                                         if (channels.Contains (ch))
512                                                                 channels.Remove (ch);
513                                                 throttle_wait_handle.Set (); // release loop wait lock.
514                                                 };
515                                         ch.Closed += delegate {
516                                                 lock (channels)
517                                                         if (channels.Contains (ch))
518                                                                 channels.Remove (ch);
519                                                 throttle_wait_handle.Set (); // release loop wait lock.
520                                                 };
521                                         };
522                                 ch.Open ();
523                         } finally {
524                                 creator_handle.Set ();
525                         }
526
527                                 ProcessRequestOrInput (ch);
528                         }
529
530                         void ProcessRequestOrInput (IChannel ch)
531                         {
532                                 var reply = ch as IReplyChannel;
533                                 var input = ch as IInputChannel;
534
535                                 if (reply != null) {
536                                         if (owner.ReceiveSynchronously) {
537                                                 RequestContext rc;
538                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
539                                                         ProcessRequest (reply, rc);
540                                         } else {
541                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
542                                         }
543                                 } else if (input != null) {
544                                         if (owner.ReceiveSynchronously) {
545                                                 Message msg;
546                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
547                                                         ProcessInput (input, msg);
548                                         } else {
549                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
550                                         }
551                                 }
552                         }
553
554                         void TryReceiveRequestDone (IAsyncResult result)
555                         {
556                                 RequestContext rc;
557                                 var reply = (IReplyChannel) result.AsyncState;
558                                 if (reply.EndTryReceiveRequest (result, out rc))
559                                         ProcessRequest (reply, rc);
560                                 else
561                                         reply.Close ();
562                         }
563
564                         void TryReceiveDone (IAsyncResult result)
565                         {
566                                 Message msg;
567                                 var input = (IInputChannel) result.AsyncState;
568                                 if (input.EndTryReceive (result, out msg))
569                                         ProcessInput (input, msg);
570                                 else
571                                         input.Close ();
572                         }
573
574                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
575                         {
576                                 var req = rc.RequestMessage;
577                                 try {
578                                         var ed = FindEndpointDispatcher (req);
579                                         new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
580                                 } catch (Exception ex) {
581                                         // FIXME: log it.
582                                         Console.WriteLine (ex);
583
584                                         var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
585                                         Message res;
586                                         if (!conv.TryCreateFaultMessage (ex, out res))
587                                                 res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
588                                         rc.Reply (res);
589                                 } finally {
590                                         if (rc != null)
591                                                 rc.Close ();
592                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
593                                         if (loop && reply.State != CommunicationState.Closed)
594                                                 ProcessRequestOrInput (reply);
595                                 }
596                         }
597
598                         void ProcessInput (IInputChannel input, Message message)
599                         {
600                                 try {
601                                         EndpointDispatcher candidate = null;
602                                         candidate = FindEndpointDispatcher (message);
603                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
604                                                 ProcessInput (message);
605                                 }
606                                 catch (Exception ex) {
607                                         // FIXME: log it.
608                                         Console.WriteLine (ex);
609                                 } finally {
610                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
611                                         if (loop && input.State != CommunicationState.Closed)
612                                                 ProcessRequestOrInput (input);
613                                 }
614                         }
615
616                         EndpointDispatcher FindEndpointDispatcher (Message message) {
617                                 EndpointDispatcher candidate = null;
618                                 bool hasEndpointMatch = false;
619                                 foreach (var endpoint in owner.Endpoints) {
620                                         if (endpoint.AddressFilter.Match (message)) {
621                                                 hasEndpointMatch = true;
622                                                 if (!endpoint.ContractFilter.Match (message))
623                                                         continue;
624                                                 var newdis = endpoint;
625                                                 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
626                                                         candidate = newdis;
627                                                 else if (candidate.FilterPriority == newdis.FilterPriority)
628                                                         throw new MultipleFilterMatchesException ();
629                                         }
630                                 }
631                                 if (candidate == null && !hasEndpointMatch) {
632                                         if (owner.Host != null)
633                                                 owner.Host.OnUnknownMessageReceived (message);
634                                         // we have to return a fault to the client anyways...
635                                         throw new EndpointNotFoundException ();
636                                 }
637                                 else if (candidate == null)
638                                         // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
639                                         throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));
640
641                                 return candidate;
642                         }
643                 }
644 }