2009-08-21 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         public class ChannelDispatcher : ChannelDispatcherBase
41         {
42                 ServiceHostBase host;
43
44                 string binding_name;            
45                 Collection<IErrorHandler> error_handlers
46                         = new Collection<IErrorHandler> ();
47                 IChannelListener listener;
48                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
49                 MessageVersion message_version;
50                 bool receive_sync, include_exception_detail_in_faults,
51                         manual_addressing, is_tx_receive;
52                 int max_tx_batch_size;
53                 SynchronizedCollection<IChannelInitializer> initializers
54                         = new SynchronizedCollection<IChannelInitializer> ();
55                 IsolationLevel tx_isolation_level;
56                 TimeSpan tx_timeout;
57                 ServiceThrottle throttle;
58
59                 Guid identifier = Guid.NewGuid ();
60                 ManualResetEvent async_event = new ManualResetEvent (false);
61
62                 ListenerLoopManager loop_manager;
63                 SynchronizedCollection<EndpointDispatcher> endpoints;
64
65                 [MonoTODO ("get binding info from config")]
66                 public ChannelDispatcher (IChannelListener listener)
67                         : this (listener, null)
68                 {
69                 }
70
71                 public ChannelDispatcher (
72                         IChannelListener listener, string bindingName)
73                         : this (listener, bindingName, null)
74                 {
75                 }
76
77                 public ChannelDispatcher (
78                         IChannelListener listener, string bindingName,
79                         IDefaultCommunicationTimeouts timeouts)
80                 {
81                         if (listener == null)
82                                 throw new ArgumentNullException ("listener");
83                         Init (listener, bindingName, timeouts);
84                 }
85
86                 private void Init (IChannelListener listener, string bindingName,
87                         IDefaultCommunicationTimeouts timeouts)
88                 {
89                         this.listener = listener;
90                         this.binding_name = bindingName;
91                         // IChannelListener is often a ChannelListenerBase
92                         // which implements IDefaultCommunicationTimeouts.
93                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
94                         endpoints = new SynchronizedCollection<EndpointDispatcher> ();
95                 }
96
97                 internal void InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
98                 {
99                         this.MessageVersion = se.Binding.MessageVersion;
100                         if (this.MessageVersion == null)
101                                 this.MessageVersion = MessageVersion.Default;
102
103                         //Attach one EndpointDispacher to the ChannelDispatcher
104                         EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
105                         ed.InitializeServiceEndpoint (this, serviceType, se);
106                         this.Endpoints.Add (ed);
107                 }
108
109                 public string BindingName {
110                         get { return binding_name; }
111                 }
112
113                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
114                         get { return initializers; }
115                 }
116
117                 protected internal override TimeSpan DefaultCloseTimeout {
118                         get { return timeouts.CloseTimeout; }
119                 }
120
121                 protected internal override TimeSpan DefaultOpenTimeout {
122                         get { return timeouts.OpenTimeout; }
123                 }
124
125                 public Collection<IErrorHandler> ErrorHandlers {
126                         get { return error_handlers; }
127                 }
128
129                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
130                         get { return endpoints; }
131                 }
132
133                 [MonoTODO]
134                 public bool IsTransactedAccept {
135                         get { throw new NotImplementedException (); }
136                 }
137
138                 public bool IsTransactedReceive {
139                         get { return is_tx_receive; }
140                         set { is_tx_receive = value; }
141                 }
142
143                 public bool ManualAddressing {
144                         get { return manual_addressing; }
145                         set { manual_addressing = value; }
146                 }
147
148                 public int MaxTransactedBatchSize {
149                         get { return max_tx_batch_size; }
150                         set { max_tx_batch_size = value; }
151                 }
152
153                 public override ServiceHostBase Host {
154                         get { return host; }
155                 }
156
157                 public override IChannelListener Listener {
158                         get { return listener; }
159                 }
160
161                 public MessageVersion MessageVersion {
162                         get { return message_version; }
163                         set { message_version = value; }
164                 }
165
166                 public bool ReceiveSynchronously {
167                         get { return receive_sync; }
168                         set {
169                                 ThrowIfDisposedOrImmutable ();
170                                 receive_sync = value; 
171                         }
172                 }
173
174                 public bool IncludeExceptionDetailInFaults {
175                         get { return include_exception_detail_in_faults; }
176                         set { include_exception_detail_in_faults = value; }
177                 }
178
179                 public ServiceThrottle ServiceThrottle {
180                         get { return throttle; }
181                         set { throttle = value; }
182                 }
183
184                 public IsolationLevel TransactionIsolationLevel {
185                         get { return tx_isolation_level; }
186                         set { tx_isolation_level = value; }
187                 }
188
189                 public TimeSpan TransactionTimeout {
190                         get { return tx_timeout; }
191                         set { tx_timeout = value; }
192                 }
193
194                 protected internal override void Attach (ServiceHostBase host)
195                 {
196                         this.host = host;
197                 }
198
199                 public override void CloseInput ()
200                 {
201                         if (loop_manager != null)
202                                 loop_manager.CloseInput ();
203                 }
204
205                 protected internal override void Detach (ServiceHostBase host)
206                 {                       
207                         this.host = null;                       
208                 }
209
210                 protected override void OnAbort ()
211                 {
212                         if (loop_manager != null)
213                                 loop_manager.Stop (TimeSpan.FromTicks (1));
214                 }
215
216                 Action<TimeSpan> open_delegate;
217                 Action<TimeSpan> close_delegate;
218
219                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
220                         AsyncCallback callback, object state)
221                 {
222                         if (close_delegate == null)
223                                 close_delegate = new Action<TimeSpan> (OnClose);
224                         return close_delegate.BeginInvoke (timeout, callback, state);
225                 }
226
227                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
228                         AsyncCallback callback, object state)
229                 {
230                         if (open_delegate == null)
231                                 open_delegate = new Action<TimeSpan> (OnClose);
232                         return open_delegate.BeginInvoke (timeout, callback, state);
233                 }
234
235                 protected override void OnClose (TimeSpan timeout)
236                 {
237                         if (loop_manager != null)
238                                 loop_manager.Stop (timeout);
239                 }
240
241                 protected override void OnClosed ()
242                 {
243                         if (host != null)
244                                 host.ChannelDispatchers.Remove (this);
245                         base.OnClosed ();
246                 }
247
248                 protected override void OnEndClose (IAsyncResult result)
249                 {
250                         close_delegate.EndInvoke (result);
251                 }
252
253                 protected override void OnEndOpen (IAsyncResult result)
254                 {
255                         open_delegate.EndInvoke (result);
256                 }
257
258                 protected override void OnOpen (TimeSpan timeout)
259                 {
260                         if (Host == null || MessageVersion == null)
261                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
262
263                         loop_manager = new ListenerLoopManager (this, timeout);
264                 }
265
266                 [MonoTODO ("what to do here?")]
267                 protected override void OnOpening ()
268                 {
269                 }
270
271                 protected override void OnOpened ()
272                 {
273                         loop_manager.Setup ();
274                 }
275
276                 internal void StartLoop ()
277                 {
278                         // FIXME: not sure if it should be filled here.
279                         if (ServiceThrottle == null)
280                                 ServiceThrottle = new ServiceThrottle ();
281
282                         loop_manager.Start ();
283                 }
284
285                 bool IsMessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
286                 {
287                         Uri to = req.Headers.To;
288                         if (to == null)
289                                 return false;
290                         if (to.AbsoluteUri == Constants.WsaAnonymousUri)
291                                 return false;
292                         return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);
293                 }
294                  
295                 void HandleError (Exception ex)
296                 {
297                         foreach (IErrorHandler handler in ErrorHandlers)
298                                 if (handler.HandleError (ex))
299                                         break;
300                 }
301
302                 class ListenerLoopManager
303                 {
304                         ChannelDispatcher owner;
305                         AutoResetEvent handle = new AutoResetEvent (false);
306                         AutoResetEvent creator_handle = new AutoResetEvent (false);
307                         ManualResetEvent stop_handle = new ManualResetEvent (false);
308                         bool loop;
309                         Thread loop_thread;
310                         TimeSpan open_timeout;
311                         Func<IAsyncResult> channel_acceptor;
312                         List<IChannel> channels = new List<IChannel> ();
313
314                         public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
315                         {
316                                 this.owner = owner;
317                                 open_timeout = openTimeout;
318                         }
319
320                         public void Setup ()
321                         {
322                                 if (owner.Listener.State != CommunicationState.Opened)
323                                         owner.Listener.Open (open_timeout);
324
325                                 // It is tested at Open(), but strangely it is not instantiated at this point.
326                                 foreach (var ed in owner.Endpoints)
327                                         if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
328                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
329                                 SetupChannelAcceptor ();
330                         }
331
332                         public void Start ()
333                         {
334                                 foreach (var ed in owner.Endpoints)
335                                         if (ed.DispatchRuntime.InstanceContextProvider == null)
336                                                 ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
337
338                                 if (loop_thread == null)
339                                         loop_thread = new Thread (new ThreadStart (Loop));
340                                 loop_thread.Start ();
341                         }
342
343                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
344                         {
345                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
346                                 if (r == null)
347                                         return null;
348                                 AsyncCallback callback = delegate (IAsyncResult result) {
349                                         try {
350                                                 ChannelAccepted (r.EndAcceptChannel (result));
351                                         } catch (Exception ex) {
352                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
353                                                 Console.WriteLine (ex);
354                                         }
355                                 };
356                                 return delegate {
357                                         try {
358                                                 return r.BeginAcceptChannel (callback, null);
359                                         } catch (Exception ex) {
360                                                 Console.WriteLine ("Exception during accepting channel.");
361                                                 Console.WriteLine (ex);
362                                                 throw;
363                                         }
364                                 };
365                         }
366
367                         void SetupChannelAcceptor ()
368                         {
369                                 var l = owner.Listener;
370                                 channel_acceptor =
371                                         CreateAcceptor<IReplyChannel> (l) ??
372                                         CreateAcceptor<IReplySessionChannel> (l) ??
373                                         CreateAcceptor<IInputChannel> (l) ??
374                                         CreateAcceptor<IInputSessionChannel> (l) ??
375                                         CreateAcceptor<IDuplexChannel> (l) ??
376                                         CreateAcceptor<IDuplexSessionChannel> (l);
377                                 if (channel_acceptor == null)
378                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
379                         }
380
381                         public void Stop (TimeSpan timeout)
382                         {
383                                 loop = false;
384                                 creator_handle.Set ();
385                                 handle.Set ();
386                                 if (stop_handle != null) {
387                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
388                                         stop_handle.Close ();
389                                         stop_handle = null;
390                                 }
391                                 if (owner.Listener.State != CommunicationState.Closed)
392                                         owner.Listener.Abort ();
393                                 if (loop_thread != null && loop_thread.IsAlive)
394                                         loop_thread.Abort ();
395                                 loop_thread = null;
396                         }
397
398                         public void CloseInput ()
399                         {
400                                 foreach (var ch in channels.ToArray ()) {
401                                         if (ch.State == CommunicationState.Closed)
402                                                 channels.Remove (ch); // zonbie, if exists
403                                         else
404                                                 ch.Close ();
405                                 }
406                         }
407
408                         void Loop ()
409                         {
410                                 try {
411                                         LoopCore ();
412                                 } catch (Exception ex) {
413                                         // FIXME: log it
414                                         Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
415                                         Console.WriteLine (ex);
416                                 } finally {
417                                         if (stop_handle != null)
418                                                 stop_handle.Set ();
419                                 }
420                         }
421
422                         void LoopCore ()
423                         {
424                                 loop = true;
425
426                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
427                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
428                                 
429                                 //FIXME: The logic here should be somewhat different as follows:
430                                 //1. Get the message
431                                 //2. Get the appropriate EndPointDispatcher that can handle the message
432                                 //   which is done using the filters (AddressFilter, ContractFilter).
433                                 //3. Let the appropriate endpoint handle the request.
434
435                                 while (loop) {
436                                         while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
437                                                 channel_acceptor ();
438                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
439                                                 creator_handle.Reset ();
440                                         }
441                                         if (!loop)
442                                                 break;
443                                         handle.WaitOne (); // released by IChannel.Close()
444                                         handle.Reset ();
445                                 }
446                                 owner.Listener.Close ();
447                                 owner.CloseInput ();
448                         }
449
450                         void ChannelAccepted (IChannel ch)
451                         {
452                         try {
453                                 if (ch == null) // could happen when it was aborted
454                                         return;
455                                 if (!loop) {
456                                         var dis = ch as IDisposable;
457                                         if (dis != null)
458                                                 dis.Dispose ();
459                                         return;
460                                 }
461
462                                 channels.Add (ch);
463                                 ch.Opened += delegate {
464                                         ch.Faulted += delegate {
465                                                 if (channels.Contains (ch))
466                                                         channels.Remove (ch);
467                                                 handle.Set (); // release loop wait lock.
468                                                 };
469                                         ch.Closed += delegate {
470                                                 if (channels.Contains (ch))
471                                                         channels.Remove (ch);
472                                                 handle.Set (); // release loop wait lock.
473                                                 };
474                                         };
475                                 ch.Open ();
476                         } finally {
477                                 creator_handle.Set ();
478                         }
479
480                                 ProcessRequestOrInput (ch);
481                         }
482
483                         void ProcessRequestOrInput (IChannel ch)
484                         {
485                                 var reply = ch as IReplyChannel;
486                                 var input = ch as IInputChannel;
487
488                                 if (reply != null) {
489                                         if (owner.ReceiveSynchronously) {
490                                                 RequestContext rc;
491                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
492                                                         ProcessRequest (reply, rc);
493                                         } else {
494                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
495                                         }
496                                 } else if (input != null) {
497                                         if (owner.ReceiveSynchronously) {
498                                                 Message msg;
499                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
500                                                         ProcessInput (input, msg);
501                                         } else {
502                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
503                                         }
504                                 }
505                         }
506
507                         void TryReceiveRequestDone (IAsyncResult result)
508                         {
509                                 RequestContext rc;
510                                 var reply = (IReplyChannel) result.AsyncState;
511                                 if (reply.EndTryReceiveRequest (result, out rc))
512                                         ProcessRequest (reply, rc);
513                         }
514
515                         void TryReceiveDone (IAsyncResult result)
516                         {
517                                 Message msg;
518                                 var input = (IInputChannel) result.AsyncState;
519                                 if (input.EndTryReceive (result, out msg))
520                                         ProcessInput (input, msg);
521                         }
522
523                         void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex) 
524                         {
525                                 try {
526
527                                         MessageVersion version = rc.RequestMessage.Version;
528                                         FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
529                                         Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
530                                         rc.Reply (res);
531                                 }
532                                 catch (Exception e) { }
533                         }
534
535                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
536                         {
537                                 try {
538                                         EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
539                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
540                                                 ProcessReply (rc);
541                                 } catch (EndpointNotFoundException ex) {
542                                         SendEndpointNotFound (rc, ex);
543                                 } catch (Exception ex) {
544                                         // FIXME: log it.
545                                         Console.WriteLine (ex);
546                                 } finally {
547                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
548                                         if (reply.State != CommunicationState.Closed)
549                                                 ProcessRequestOrInput (reply);
550                                 }
551                         }
552
553                         void ProcessInput (IInputChannel input, Message message)
554                         {
555                                 try {
556                                         EndpointDispatcher candidate = null;
557                                         candidate = FindEndpointDispatcher (message);
558                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
559                                                 ProcessInput (message);
560                                 }
561                                 catch (Exception ex) {
562                                         // FIXME: log it.
563                                         Console.WriteLine (ex);
564                                 } finally {
565                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
566                                         if (input.State != CommunicationState.Closed)
567                                                 ProcessRequestOrInput (input);
568                                 }
569                         }
570
571                         EndpointDispatcher FindEndpointDispatcher (Message message) {
572                                 EndpointDispatcher candidate = null;
573                                 for (int i = 0; i < owner.Endpoints.Count; i++) {
574                                         if (owner.IsMessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
575                                                 var newdis = owner.Endpoints [i];
576                                                 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
577                                                         candidate = newdis;
578                                                 else if (candidate.FilterPriority == newdis.FilterPriority)
579                                                         throw new MultipleFilterMatchesException ();
580                                         }
581                                 }
582                                 if (candidate == null)
583                                         owner.Host.OnUnknownMessageReceived (message);
584                                 return candidate;
585                         }
586                 }
587         }
588 }