2009-10-16 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 //      Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc.  http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
37
38 namespace System.ServiceModel.Dispatcher
39 {
40         public class ChannelDispatcher : ChannelDispatcherBase
41         {
42                 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
43                 {
44                         public EndpointDispatcherCollection (ChannelDispatcher owner)
45                         {
46                                 this.owner = owner;
47                         }
48
49                         ChannelDispatcher owner;
50
51                         protected override void ClearItems ()
52                         {
53                                 foreach (var ed in this)
54                                         ed.ChannelDispatcher = null;
55                                 base.ClearItems ();
56                         }
57
58                         protected override void InsertItem (int index, EndpointDispatcher item)
59                         {
60                                 item.ChannelDispatcher = owner;
61                                 base.InsertItem (index, item);
62                         }
63
64                         protected override void RemoveItem (int index)
65                         {
66                                 if (index < Count)
67                                         this [index].ChannelDispatcher = null;
68                                 base.RemoveItem (index);
69                         }
70
71                         protected override void SetItem (int index, EndpointDispatcher item)
72                         {
73                                 item.ChannelDispatcher = owner;
74                                 base.SetItem (index, item);
75                         }
76                 }
77
78                 ServiceHostBase host;
79
80                 string binding_name;            
81                 Collection<IErrorHandler> error_handlers
82                         = new Collection<IErrorHandler> ();
83                 IChannelListener listener;
84                 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
85                 MessageVersion message_version;
86                 bool receive_sync, include_exception_detail_in_faults,
87                         manual_addressing, is_tx_receive;
88                 int max_tx_batch_size;
89                 SynchronizedCollection<IChannelInitializer> initializers
90                         = new SynchronizedCollection<IChannelInitializer> ();
91                 IsolationLevel tx_isolation_level;
92                 TimeSpan tx_timeout;
93                 ServiceThrottle throttle;
94
95                 Guid identifier = Guid.NewGuid ();
96                 ManualResetEvent async_event = new ManualResetEvent (false);
97
98                 ListenerLoopManager loop_manager;
99                 SynchronizedCollection<EndpointDispatcher> endpoints;
100
101                 [MonoTODO ("get binding info from config")]
102                 public ChannelDispatcher (IChannelListener listener)
103                         : this (listener, null)
104                 {
105                 }
106
107                 public ChannelDispatcher (
108                         IChannelListener listener, string bindingName)
109                         : this (listener, bindingName, null)
110                 {
111                 }
112
113                 public ChannelDispatcher (
114                         IChannelListener listener, string bindingName,
115                         IDefaultCommunicationTimeouts timeouts)
116                 {
117                         if (listener == null)
118                                 throw new ArgumentNullException ("listener");
119                         Init (listener, bindingName, timeouts);
120                 }
121
122                 private void Init (IChannelListener listener, string bindingName,
123                         IDefaultCommunicationTimeouts timeouts)
124                 {
125                         this.listener = listener;
126                         this.binding_name = bindingName;
127                         // IChannelListener is often a ChannelListenerBase
128                         // which implements IDefaultCommunicationTimeouts.
129                         this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
130                         endpoints = new EndpointDispatcherCollection (this);
131                 }
132
133                 internal void InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
134                 {
135                         this.MessageVersion = se.Binding.MessageVersion;
136                         if (this.MessageVersion == null)
137                                 this.MessageVersion = MessageVersion.Default;
138
139                         //Attach one EndpointDispacher to the ChannelDispatcher
140                         EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
141                         this.Endpoints.Add (ed);
142                         ed.InitializeServiceEndpoint (false, serviceType, se);
143                 }
144
145                 public string BindingName {
146                         get { return binding_name; }
147                 }
148
149                 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
150                         get { return initializers; }
151                 }
152
153                 protected internal override TimeSpan DefaultCloseTimeout {
154                         get { return timeouts.CloseTimeout; }
155                 }
156
157                 protected internal override TimeSpan DefaultOpenTimeout {
158                         get { return timeouts.OpenTimeout; }
159                 }
160
161                 public Collection<IErrorHandler> ErrorHandlers {
162                         get { return error_handlers; }
163                 }
164
165                 public SynchronizedCollection<EndpointDispatcher> Endpoints {
166                         get { return endpoints; }
167                 }
168
169                 [MonoTODO]
170                 public bool IsTransactedAccept {
171                         get { throw new NotImplementedException (); }
172                 }
173
174                 public bool IsTransactedReceive {
175                         get { return is_tx_receive; }
176                         set { is_tx_receive = value; }
177                 }
178
179                 public bool ManualAddressing {
180                         get { return manual_addressing; }
181                         set { manual_addressing = value; }
182                 }
183
184                 public int MaxTransactedBatchSize {
185                         get { return max_tx_batch_size; }
186                         set { max_tx_batch_size = value; }
187                 }
188
189                 public override ServiceHostBase Host {
190                         get { return host; }
191                 }
192
193                 public override IChannelListener Listener {
194                         get { return listener; }
195                 }
196
197                 public MessageVersion MessageVersion {
198                         get { return message_version; }
199                         set { message_version = value; }
200                 }
201
202                 public bool ReceiveSynchronously {
203                         get { return receive_sync; }
204                         set {
205                                 ThrowIfDisposedOrImmutable ();
206                                 receive_sync = value; 
207                         }
208                 }
209
210                 public bool IncludeExceptionDetailInFaults {
211                         get { return include_exception_detail_in_faults; }
212                         set { include_exception_detail_in_faults = value; }
213                 }
214
215                 public ServiceThrottle ServiceThrottle {
216                         get { return throttle; }
217                         set { throttle = value; }
218                 }
219
220                 public IsolationLevel TransactionIsolationLevel {
221                         get { return tx_isolation_level; }
222                         set { tx_isolation_level = value; }
223                 }
224
225                 public TimeSpan TransactionTimeout {
226                         get { return tx_timeout; }
227                         set { tx_timeout = value; }
228                 }
229
230                 protected internal override void Attach (ServiceHostBase host)
231                 {
232                         this.host = host;
233                 }
234
235                 public override void CloseInput ()
236                 {
237                         if (loop_manager != null)
238                                 loop_manager.CloseInput ();
239                 }
240
241                 protected internal override void Detach (ServiceHostBase host)
242                 {                       
243                         this.host = null;                       
244                 }
245
246                 protected override void OnAbort ()
247                 {
248                         if (loop_manager != null)
249                                 loop_manager.Stop (TimeSpan.FromTicks (1));
250                 }
251
252                 Action<TimeSpan> open_delegate;
253                 Action<TimeSpan> close_delegate;
254
255                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
256                         AsyncCallback callback, object state)
257                 {
258                         if (close_delegate == null)
259                                 close_delegate = new Action<TimeSpan> (OnClose);
260                         return close_delegate.BeginInvoke (timeout, callback, state);
261                 }
262
263                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
264                         AsyncCallback callback, object state)
265                 {
266                         if (open_delegate == null)
267                                 open_delegate = new Action<TimeSpan> (OnClose);
268                         return open_delegate.BeginInvoke (timeout, callback, state);
269                 }
270
271                 protected override void OnClose (TimeSpan timeout)
272                 {
273                         if (loop_manager != null)
274                                 loop_manager.Stop (timeout);
275                 }
276
277                 protected override void OnClosed ()
278                 {
279                         if (host != null)
280                                 host.ChannelDispatchers.Remove (this);
281                         base.OnClosed ();
282                 }
283
284                 protected override void OnEndClose (IAsyncResult result)
285                 {
286                         close_delegate.EndInvoke (result);
287                 }
288
289                 protected override void OnEndOpen (IAsyncResult result)
290                 {
291                         open_delegate.EndInvoke (result);
292                 }
293
294                 protected override void OnOpen (TimeSpan timeout)
295                 {
296                         if (Host == null || MessageVersion == null)
297                                 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
298
299                         loop_manager.Setup (timeout);
300                 }
301
302                 protected override void OnOpening ()
303                 {
304                         base.OnOpening ();
305                         loop_manager = new ListenerLoopManager (this);
306                 }
307
308                 protected override void OnOpened ()
309                 {
310                         base.OnOpened ();
311                         StartLoop ();
312                 }
313
314                 void StartLoop ()
315                 {
316                         // FIXME: not sure if it should be filled here.
317                         if (ServiceThrottle == null)
318                                 ServiceThrottle = new ServiceThrottle ();
319
320                         loop_manager.Start ();
321                 }
322         }
323
324                 // isolated from ChannelDispatcher
325                 class ListenerLoopManager
326                 {
327                         ChannelDispatcher owner;
328                         AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
329                         AutoResetEvent creator_handle = new AutoResetEvent (false);
330                         ManualResetEvent stop_handle = new ManualResetEvent (false);
331                         bool loop;
332                         Thread loop_thread;
333                         DateTime close_started;
334                         TimeSpan close_timeout;
335                         Func<IAsyncResult> channel_acceptor;
336                         List<IChannel> channels = new List<IChannel> ();
337
338                         public ListenerLoopManager (ChannelDispatcher owner)
339                         {
340                                 this.owner = owner;
341                         }
342
343                         public void Setup (TimeSpan openTimeout)
344                         {
345                                 if (owner.Listener.State != CommunicationState.Opened)
346                                         owner.Listener.Open (openTimeout);
347
348                                 // It is tested at Open(), but strangely it is not instantiated at this point.
349                                 foreach (var ed in owner.Endpoints)
350                                         if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
351                                                 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
352                                 SetupChannelAcceptor ();
353                         }
354
355                         public void Start ()
356                         {
357                                 foreach (var ed in owner.Endpoints)
358                                         if (ed.DispatchRuntime.InstanceContextProvider == null)
359                                                 ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
360
361                                 if (loop_thread == null)
362                                         loop_thread = new Thread (new ThreadStart (Loop));
363                                 loop_thread.Start ();
364                         }
365
366                         Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
367                         {
368                                 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
369                                 if (r == null)
370                                         return null;
371                                 AsyncCallback callback = delegate (IAsyncResult result) {
372                                         try {
373                                                 ChannelAccepted (r.EndAcceptChannel (result));
374                                         } catch (Exception ex) {
375                                                 Console.WriteLine ("Exception during finishing channel acceptance.");
376                                                 Console.WriteLine (ex);
377                                         }
378                                 };
379                                 return delegate {
380                                         try {
381                                                 return r.BeginAcceptChannel (callback, null);
382                                         } catch (Exception ex) {
383                                                 Console.WriteLine ("Exception during accepting channel.");
384                                                 Console.WriteLine (ex);
385                                                 throw;
386                                         }
387                                 };
388                         }
389
390                         void SetupChannelAcceptor ()
391                         {
392                                 var l = owner.Listener;
393                                 channel_acceptor =
394                                         CreateAcceptor<IReplyChannel> (l) ??
395                                         CreateAcceptor<IReplySessionChannel> (l) ??
396                                         CreateAcceptor<IInputChannel> (l) ??
397                                         CreateAcceptor<IInputSessionChannel> (l) ??
398                                         CreateAcceptor<IDuplexChannel> (l) ??
399                                         CreateAcceptor<IDuplexSessionChannel> (l);
400                                 if (channel_acceptor == null)
401                                         throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
402                         }
403
404                         public void Stop (TimeSpan timeout)
405                         {
406                                 if (loop_thread == null)
407                                         return;
408
409                                 close_started = DateTime.Now;
410                                 close_timeout = timeout;
411                                 loop = false;
412                                 creator_handle.Set ();
413                                 throttle_wait_handle.Set (); // break primary loop
414                                 if (stop_handle != null) {
415                                         stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
416                                         stop_handle.Close ();
417                                         stop_handle = null;
418                                 }
419                                 if (owner.Listener.State != CommunicationState.Closed)
420                                         owner.Listener.Abort ();
421                                 if (loop_thread != null && loop_thread.IsAlive)
422                                         loop_thread.Abort ();
423                                 loop_thread = null;
424                         }
425
426                         public void CloseInput ()
427                         {
428                                 foreach (var ch in channels.ToArray ()) {
429                                         if (ch.State == CommunicationState.Closed)
430                                                 channels.Remove (ch); // zonbie, if exists
431                                         else {
432                                                 try {
433                                                         ch.Close (close_timeout - (DateTime.Now - close_started));
434                                                 } catch (Exception ex) {
435                                                         // FIXME: log it.
436                                                         Console.WriteLine (ex);
437                                                         ch.Abort ();
438                                                 }
439                                         }
440                                 }
441                         }
442
443                         void Loop ()
444                         {
445                                 try {
446                                         LoopCore ();
447                                 } catch (Exception ex) {
448                                         // FIXME: log it
449                                         Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
450                                         Console.WriteLine (ex);
451                                 } finally {
452                                         if (stop_handle != null)
453                                                 stop_handle.Set ();
454                                 }
455                         }
456
457                         void LoopCore ()
458                         {
459                                 loop = true;
460
461                                 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
462                                 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
463                                 
464                                 //FIXME: The logic here should be somewhat different as follows:
465                                 //1. Get the message
466                                 //2. Get the appropriate EndPointDispatcher that can handle the message
467                                 //   which is done using the filters (AddressFilter, ContractFilter).
468                                 //3. Let the appropriate endpoint handle the request.
469
470                                 while (loop) {
471                                         while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
472                                                 channel_acceptor ();
473                                                 creator_handle.WaitOne (); // released by ChannelAccepted()
474                                         }
475                                         if (!loop)
476                                                 break;
477                                         throttle_wait_handle.WaitOne (); // released by IChannel.Close()
478                                 }
479                                 try {
480                                         owner.Listener.Close ();
481                                 } finally {
482                                         // make sure to close both listener and channels.
483                                         owner.CloseInput ();
484                                 }
485                         }
486
487                         void ChannelAccepted (IChannel ch)
488                         {
489                         try {
490                                 if (ch == null) // could happen when it was aborted
491                                         return;
492                                 if (!loop) {
493                                         var dis = ch as IDisposable;
494                                         if (dis != null)
495                                                 dis.Dispose ();
496                                         return;
497                                 }
498
499                                 channels.Add (ch);
500                                 ch.Opened += delegate {
501                                         ch.Faulted += delegate {
502                                                 if (channels.Contains (ch))
503                                                         channels.Remove (ch);
504                                                 throttle_wait_handle.Set (); // release loop wait lock.
505                                                 };
506                                         ch.Closed += delegate {
507                                                 if (channels.Contains (ch))
508                                                         channels.Remove (ch);
509                                                 throttle_wait_handle.Set (); // release loop wait lock.
510                                                 };
511                                         };
512                                 ch.Open ();
513                         } finally {
514                                 creator_handle.Set ();
515                         }
516
517                                 ProcessRequestOrInput (ch);
518                         }
519
520                         void ProcessRequestOrInput (IChannel ch)
521                         {
522                                 var reply = ch as IReplyChannel;
523                                 var input = ch as IInputChannel;
524
525                                 if (reply != null) {
526                                         if (owner.ReceiveSynchronously) {
527                                                 RequestContext rc;
528                                                 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
529                                                         ProcessRequest (reply, rc);
530                                         } else {
531                                                 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
532                                         }
533                                 } else if (input != null) {
534                                         if (owner.ReceiveSynchronously) {
535                                                 Message msg;
536                                                 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
537                                                         ProcessInput (input, msg);
538                                         } else {
539                                                 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
540                                         }
541                                 }
542                         }
543
544                         void TryReceiveRequestDone (IAsyncResult result)
545                         {
546                                 RequestContext rc;
547                                 var reply = (IReplyChannel) result.AsyncState;
548                                 if (reply.EndTryReceiveRequest (result, out rc))
549                                         ProcessRequest (reply, rc);
550                         }
551
552                         void TryReceiveDone (IAsyncResult result)
553                         {
554                                 Message msg;
555                                 var input = (IInputChannel) result.AsyncState;
556                                 if (input.EndTryReceive (result, out msg))
557                                         ProcessInput (input, msg);
558                         }
559
560                         void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex) 
561                         {
562                                 try {
563
564                                         MessageVersion version = rc.RequestMessage.Version;
565                                         FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
566                                         Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
567                                         rc.Reply (res);
568                                 } catch (Exception e) {
569                                         // FIXME: log it
570                                         Console.WriteLine ("Error on sending DestinationUnreachable fault message: " + e);
571                                 }
572                         }
573
574                         void ProcessRequest (IReplyChannel reply, RequestContext rc)
575                         {
576                                 try {
577                                         EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
578                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
579                                                 ProcessReply (rc);
580                                 } catch (EndpointNotFoundException ex) {
581                                         SendEndpointNotFound (rc, ex);
582                                 } catch (Exception ex) {
583                                         // FIXME: log it.
584                                         Console.WriteLine (ex);
585                                 } finally {
586                                         if (rc != null)
587                                                 rc.Close ();
588                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
589                                         if (reply.State != CommunicationState.Closed)
590                                                 ProcessRequestOrInput (reply);
591                                 }
592                         }
593
594                         void ProcessInput (IInputChannel input, Message message)
595                         {
596                                 try {
597                                         EndpointDispatcher candidate = null;
598                                         candidate = FindEndpointDispatcher (message);
599                                         new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
600                                                 ProcessInput (message);
601                                 }
602                                 catch (Exception ex) {
603                                         // FIXME: log it.
604                                         Console.WriteLine (ex);
605                                 } finally {
606                                         // unless it is closed by session/call manager, move it back to the loop to receive the next message.
607                                         if (input.State != CommunicationState.Closed)
608                                                 ProcessRequestOrInput (input);
609                                 }
610                         }
611
612                         EndpointDispatcher FindEndpointDispatcher (Message message) {
613                                 EndpointDispatcher candidate = null;
614                                 for (int i = 0; i < owner.Endpoints.Count; i++) {
615                                         if (MessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
616                                                 var newdis = owner.Endpoints [i];
617                                                 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
618                                                         candidate = newdis;
619                                                 else if (candidate.FilterPriority == newdis.FilterPriority)
620                                                         throw new MultipleFilterMatchesException ();
621                                         }
622                                 }
623                                 if (candidate == null && owner.Host != null)
624                                         owner.Host.OnUnknownMessageReceived (message);
625                                 return candidate;
626                         }
627
628                         bool MessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
629                         {
630                                 Uri to = req.Headers.To;
631                                 if (to == null)
632                                         return false;
633                                 if (to.AbsoluteUri == Constants.WsaAnonymousUri)
634                                         return false;
635                                 return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);
636                         }
637                 }
638 }