2009-06-25 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         public class ChannelDispatcher : ChannelDispatcherBase
41         {
42                 ServiceHostBase host;
43
44                 string binding_name;            
45                 Collection<IErrorHandler> error_handlers
46                         = new Collection<IErrorHandler> ();
47                 IChannelListener listener;
48                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
49                 MessageVersion message_version;
50                 bool receive_sync, include_exception_detail_in_faults,
51                         manual_addressing, is_tx_receive;
52                 int max_tx_batch_size;
53                 SynchronizedCollection<IChannelInitializer> initializers
54                         = new SynchronizedCollection<IChannelInitializer> ();
55                 IsolationLevel tx_isolation_level;
56                 TimeSpan tx_timeout;
57                 ServiceThrottle throttle;
58
59                 Guid identifier = Guid.NewGuid ();
60                 ManualResetEvent async_event = new ManualResetEvent (false);
61
62                 ListenerLoopManager loop_manager;
63                 SynchronizedCollection<EndpointDispatcher> endpoints;
64
65                 [MonoTODO ("get binding info from config")]
66                 public ChannelDispatcher (IChannelListener listener)
67                         : this (listener, null)
68                 {
69                 }
70
71                 public ChannelDispatcher (
72                         IChannelListener listener, string bindingName)
73                         : this (listener, bindingName, null)
74                 {
75                 }
76
77                 public ChannelDispatcher (
78                         IChannelListener listener, string bindingName,
79                         IDefaultCommunicationTimeouts timeouts)
80                 {
81                         if (listener == null)
82                                 throw new ArgumentNullException ("listener");
83                         Init (listener, bindingName, timeouts);
84                 }
85
86                 private void Init (IChannelListener listener, string bindingName,
87                         IDefaultCommunicationTimeouts timeouts)
88                 {
89                         this.listener = listener;
90                         this.binding_name = bindingName;
91                         // IChannelListener is often a ChannelListenerBase
92                         // which implements IDefaultCommunicationTimeouts.
93                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
94                         endpoints = new SynchronizedCollection<EndpointDispatcher> ();
95                 }
96
97                 public string BindingName {
98                         get { return binding_name; }
99                 }
100
101                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
102                         get { return initializers; }
103                 }
104
105                 protected internal override TimeSpan DefaultCloseTimeout {
106                         get { return timeouts.CloseTimeout; }
107                 }
108
109                 protected internal override TimeSpan DefaultOpenTimeout {
110                         get { return timeouts.OpenTimeout; }
111                 }
112
113                 public Collection<IErrorHandler> ErrorHandlers {
114                         get { return error_handlers; }
115                 }
116
117                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
118                         get { return endpoints; }
119                 }
120
121                 [MonoTODO]
122                 public bool IsTransactedAccept {
123                         get { throw new NotImplementedException (); }
124                 }
125
126                 public bool IsTransactedReceive {
127                         get { return is_tx_receive; }
128                         set { is_tx_receive = value; }
129                 }
130
131                 public bool ManualAddressing {
132                         get { return manual_addressing; }
133                         set { manual_addressing = value; }
134                 }
135
136                 public int MaxTransactedBatchSize {
137                         get { return max_tx_batch_size; }
138                         set { max_tx_batch_size = value; }
139                 }
140
141                 public override ServiceHostBase Host {
142                         get { return host; }
143                 }
144
145                 public override IChannelListener Listener {
146                         get { return listener; }
147                 }
148
149                 public MessageVersion MessageVersion {
150                         get { return message_version; }
151                         set { message_version = value; }
152                 }
153
154                 public bool ReceiveSynchronously {
155                         get { return receive_sync; }
156                         set {
157                                 ThrowIfDisposedOrImmutable ();
158                                 receive_sync = value; 
159                         }
160                 }
161
162                 public bool IncludeExceptionDetailInFaults {
163                         get { return include_exception_detail_in_faults; }
164                         set { include_exception_detail_in_faults = value; }
165                 }
166
167                 public ServiceThrottle ServiceThrottle {
168                         get { return throttle; }
169                         set { throttle = value; }
170                 }
171
172                 public IsolationLevel TransactionIsolationLevel {
173                         get { return tx_isolation_level; }
174                         set { tx_isolation_level = value; }
175                 }
176
177                 public TimeSpan TransactionTimeout {
178                         get { return tx_timeout; }
179                         set { tx_timeout = value; }
180                 }
181
182                 protected internal override void Attach (ServiceHostBase host)
183                 {
184                         this.host = host;
185                 }
186
187                 public override void CloseInput ()
188                 {
189                         if (loop_manager != null)
190                                 loop_manager.CloseInput ();
191                 }
192
193                 protected internal override void Detach (ServiceHostBase host)
194                 {                       
195                         this.host = null;                       
196                 }
197
198                 protected override void OnAbort ()
199                 {
200                         if (loop_manager != null)
201                                 loop_manager.Stop (TimeSpan.FromTicks (1));
202                 }
203
204                 Action<TimeSpan> open_delegate;
205                 Action<TimeSpan> close_delegate;
206
207                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
208                         AsyncCallback callback, object state)
209                 {
210                         if (close_delegate == null)
211                                 close_delegate = new Action<TimeSpan> (OnClose);
212                         return close_delegate.BeginInvoke (timeout, callback, state);
213                 }
214
215                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
216                         AsyncCallback callback, object state)
217                 {
218                         if (open_delegate == null)
219                                 open_delegate = new Action<TimeSpan> (OnClose);
220                         return open_delegate.BeginInvoke (timeout, callback, state);
221                 }
222
223                 protected override void OnClose (TimeSpan timeout)
224                 {
225                         if (loop_manager != null)
226                                 loop_manager.Stop (timeout);
227                 }
228
229                 protected override void OnClosed ()
230                 {
231                         if (host != null)
232                                 host.ChannelDispatchers.Remove (this);
233                         base.OnClosed ();
234                 }
235
236                 protected override void OnEndClose (IAsyncResult result)
237                 {
238                         close_delegate.EndInvoke (result);
239                 }
240
241                 protected override void OnEndOpen (IAsyncResult result)
242                 {
243                         open_delegate.EndInvoke (result);
244                 }
245
246                 protected override void OnOpen (TimeSpan timeout)
247                 {
248                         if (Host == null || MessageVersion == null)
249                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
250
251                         loop_manager = new ListenerLoopManager (this, timeout);
252                 }
253
254                 [MonoTODO ("what to do here?")]
255                 protected override void OnOpening ()
256                 {
257                 }
258
259                 protected override void OnOpened ()
260                 {
261                         loop_manager.Setup ();
262                 }
263
264                 internal void StartLoop ()
265                 {
266                         // FIXME: not sure if it should be filled here.
267                         if (ServiceThrottle == null)
268                                 ServiceThrottle = new ServiceThrottle ();
269
270                         loop_manager.Start ();
271                 }
272
273                 bool IsMessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
274                 {
275                         Uri to = req.Headers.To;
276                         if (to == null)
277                                 return false;
278                         if (to.AbsoluteUri == Constants.WsaAnonymousUri)
279                                 return false;
280                         return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);
281                 }
282                  
283                 void HandleError (Exception ex)
284                 {
285                         foreach (IErrorHandler handler in ErrorHandlers)
286                                 if (handler.HandleError (ex))
287                                         break;
288                 }
289
290                 class ListenerLoopManager
291                 {
292                         ChannelDispatcher owner;
293                         AutoResetEvent handle = new AutoResetEvent (false);
294                         AutoResetEvent creator_handle = new AutoResetEvent (false);
295                         ManualResetEvent stop_handle = new ManualResetEvent (false);
296                         bool loop;
297                         Thread loop_thread;
298                         TimeSpan open_timeout;
299                         Func<IAsyncResult> channel_acceptor;
300                         List<IChannel> channels = new List<IChannel> ();
301
302                         public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
303                         {
304                                 this.owner = owner;
305                                 open_timeout = openTimeout;
306                         }
307
308                         public void Setup ()
309                         {
310                                 if (owner.Listener.State != CommunicationState.Opened)
311                                         owner.Listener.Open (open_timeout);
312
313                                 // It is tested at Open(), but strangely it is not instantiated at this point.
314                                 foreach (var ed in owner.Endpoints)
315                                         if (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null)
316                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
317                                 SetupChannelAcceptor ();
318                         }
319
320                         public void Start ()
321                         {
322                                 foreach (var ed in owner.Endpoints)
323                                         if (ed.DispatchRuntime.InstanceContextProvider == null)
324                                                 ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
325
326                                 if (loop_thread == null)
327                                         loop_thread = new Thread (new ThreadStart (Loop));
328                                 loop_thread.Start ();
329                         }
330
331                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
332                         {
333                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
334                                 if (r == null)
335                                         return null;
336                                 AsyncCallback callback = delegate (IAsyncResult result) {
337                                         try {
338                                                 ChannelAccepted (r.EndAcceptChannel (result));
339                                         } catch (Exception ex) {
340                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
341                                                 Console.WriteLine (ex);
342                                         }
343                                 };
344                                 return delegate {
345                                         try {
346                                                 return r.BeginAcceptChannel (callback, null);
347                                         } catch (Exception ex) {
348                                                 Console.WriteLine ("Exception during accepting channel.");
349                                                 Console.WriteLine (ex);
350                                                 throw;
351                                         }
352                                 };
353                         }
354
355                         void SetupChannelAcceptor ()
356                         {
357                                 var l = owner.Listener;
358                                 channel_acceptor =
359                                         CreateAcceptor<IReplyChannel> (l) ??
360                                         CreateAcceptor<IReplySessionChannel> (l) ??
361                                         CreateAcceptor<IInputChannel> (l) ??
362                                         CreateAcceptor<IInputSessionChannel> (l) ??
363                                         CreateAcceptor<IDuplexChannel> (l) ??
364                                         CreateAcceptor<IDuplexSessionChannel> (l);
365                                 if (channel_acceptor == null)
366                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
367                         }
368
369                         public void Stop (TimeSpan timeout)
370                         {
371                                 loop = false;
372                                 creator_handle.Set ();
373                                 handle.Set ();
374                                 if (stop_handle != null) {
375                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
376                                         stop_handle.Close ();
377                                         stop_handle = null;
378                                 }
379                                 if (owner.Listener.State != CommunicationState.Closed)
380                                         owner.Listener.Abort ();
381                                 if (loop_thread != null && loop_thread.IsAlive)
382                                         loop_thread.Abort ();
383                                 loop_thread = null;
384                         }
385
386                         public void CloseInput ()
387                         {
388                                 foreach (var ch in channels.ToArray ()) {
389                                         if (ch.State == CommunicationState.Closed)
390                                                 channels.Remove (ch); // zonbie, if exists
391                                         else
392                                                 ch.Close ();
393                                 }
394                         }
395
396                         void Loop ()
397                         {
398                                 try {
399                                         LoopCore ();
400                                 } catch (Exception ex) {
401                                         // FIXME: log it
402                                         Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
403                                         Console.WriteLine (ex);
404                                 } finally {
405                                         if (stop_handle != null)
406                                                 stop_handle.Set ();
407                                 }
408                         }
409
410                         void LoopCore ()
411                         {
412                                 loop = true;
413
414                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
415                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
416                                 
417                                 //FIXME: The logic here should be somewhat different as follows:
418                                 //1. Get the message
419                                 //2. Get the appropriate EndPointDispatcher that can handle the message
420                                 //   which is done using the filters (AddressFilter, ContractFilter).
421                                 //3. Let the appropriate endpoint handle the request.
422
423                                 while (loop) {
424                                         while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
425                                                 channel_acceptor ();
426                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
427                                                 creator_handle.Reset ();
428                                         }
429                                         handle.WaitOne (); // released by IChannel.Close()
430                                         handle.Reset ();
431                                 }
432                                 owner.CloseInput ();
433                                 owner.Listener.Close ();
434                         }
435
436                         void ChannelAccepted (IChannel ch)
437                         {
438                         try {
439                                 if (ch == null) // could happen when it was aborted
440                                         return;
441                                 if (!loop) {
442                                         var dis = ch as IDisposable;
443                                         if (dis != null)
444                                                 dis.Dispose ();
445                                         return;
446                                 }
447
448                                 channels.Add (ch);
449                                 ch.Opened += delegate {
450                                         ch.Faulted += delegate {
451                                                 if (channels.Contains (ch))
452                                                         channels.Remove (ch);
453                                                 handle.Set (); // release loop wait lock.
454                                                 };
455                                         ch.Closed += delegate {
456                                                 if (channels.Contains (ch))
457                                                         channels.Remove (ch);
458                                                 handle.Set (); // release loop wait lock.
459                                                 };
460                                         };
461                         } finally {
462                                 creator_handle.Set ();
463                         }
464
465                                 ch.Open ();
466                                 ProcessRequestOrInput (ch);
467                         }
468
469                         void ProcessRequestOrInput (IChannel ch)
470                         {
471                                 var reply = ch as IReplyChannel;
472                                 var input = ch as IInputChannel;
473
474                                 if (reply != null) {
475                                         if (owner.ReceiveSynchronously) {
476                                                 RequestContext rc;
477                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
478                                                         ProcessRequest (reply, rc);
479                                         } else {
480                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
481                                         }
482                                 } else if (input != null) {
483                                         if (owner.ReceiveSynchronously) {
484                                                 Message msg;
485                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
486                                                         ProcessInput (input, msg);
487                                         } else {
488                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
489                                         }
490                                 }
491                         }
492
493                         void TryReceiveRequestDone (IAsyncResult result)
494                         {
495                                 RequestContext rc;
496                                 var reply = (IReplyChannel) result.AsyncState;
497                                 if (reply.EndTryReceiveRequest (result, out rc))
498                                         ProcessRequest (reply, rc);
499                         }
500
501                         void TryReceiveDone (IAsyncResult result)
502                         {
503                                 Message msg;
504                                 var input = (IInputChannel) result.AsyncState;
505                                 if (input.EndTryReceive (result, out msg))
506                                         ProcessInput (input, msg);
507                         }
508
509                         void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex) 
510                         {
511                                 try {
512
513                                         MessageVersion version = rc.RequestMessage.Version;
514                                         FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
515                                         Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
516                                         rc.Reply (res);
517                                 }
518                                 catch (Exception e) { }
519                         }
520
521                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
522                         {
523                                 try {
524                                         EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
525                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
526                                                 ProcessReply (rc);
527                                 } catch (EndpointNotFoundException ex) {
528                                         SendEndpointNotFound (rc, ex);
529                                 } catch (Exception ex) {
530                                         // FIXME: log it.
531                                         Console.WriteLine (ex);
532                                 } finally {
533                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
534                                         if (reply.State != CommunicationState.Closed)
535                                                 ProcessRequestOrInput (reply);
536                                 }
537                         }
538
539                         void ProcessInput (IInputChannel input, Message message)
540                         {
541                                 try {
542                                         EndpointDispatcher candidate = null;
543                                         candidate = FindEndpointDispatcher (message);
544                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
545                                                 ProcessInput (message);
546                                 }
547                                 catch (Exception ex) {
548                                         // FIXME: log it.
549                                         Console.WriteLine (ex);
550                                 } finally {
551                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
552                                         if (input.State != CommunicationState.Closed)
553                                                 ProcessRequestOrInput (input);
554                                 }
555                         }
556
557                         EndpointDispatcher FindEndpointDispatcher (Message message) {
558                                 EndpointDispatcher candidate = null;
559                                 for (int i = 0; i < owner.Endpoints.Count; i++) {
560                                         if (owner.IsMessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
561                                                 candidate = owner.Endpoints [i];
562                                                 break;
563                                         }
564                                 }
565                                 if (candidate == null)
566                                         throw new EndpointNotFoundException (String.Format ("The request message has the target '{0}' which is not reachable in this service contract", message.Headers.To));
567                                 return candidate;
568                         }
569                 }
570         }
571 }