Merge pull request #1225 from strawd/bug22307
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
index 4ad1ab0804fa7149f5621866c9748f39397c8e23..65b5c8cf38ae6913b2482b54609ecc8ee45064e4 100644 (file)
 using System;
 using System.Collections.Generic;
 using System.Collections.ObjectModel;
+using System.Linq;
 using System.Reflection;
 using System.ServiceModel.Channels;
 using System.Threading;
 using System.Transactions;
 using System.ServiceModel;
 using System.ServiceModel.Description;
+using System.Net.Sockets;
+using System.Xml;
+using System.IO;
 
 namespace System.ServiceModel.Dispatcher
 {
        public class ChannelDispatcher : ChannelDispatcherBase
        {
+               class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
+               {
+                       public EndpointDispatcherCollection (ChannelDispatcher owner)
+                       {
+                               this.owner = owner;
+                       }
+
+                       ChannelDispatcher owner;
+
+                       protected override void ClearItems ()
+                       {
+                               foreach (var ed in this)
+                                       ed.ChannelDispatcher = null;
+                               base.ClearItems ();
+                       }
+
+                       protected override void InsertItem (int index, EndpointDispatcher item)
+                       {
+                               item.ChannelDispatcher = owner;
+                               base.InsertItem (index, item);
+                       }
+
+                       protected override void RemoveItem (int index)
+                       {
+                               if (index < Count)
+                                       this [index].ChannelDispatcher = null;
+                               base.RemoveItem (index);
+                       }
+
+                       protected override void SetItem (int index, EndpointDispatcher item)
+                       {
+                               item.ChannelDispatcher = owner;
+                               base.SetItem (index, item);
+                       }
+               }
+
                ServiceHostBase host;
 
                string binding_name;            
@@ -91,7 +131,16 @@ namespace System.ServiceModel.Dispatcher
                        // IChannelListener is often a ChannelListenerBase
                        // which implements IDefaultCommunicationTimeouts.
                        this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
-                       endpoints = new SynchronizedCollection<EndpointDispatcher> ();
+                       endpoints = new EndpointDispatcherCollection (this);
+               }
+
+               internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
+               {
+                       //Attach one EndpointDispacher to the ChannelDispatcher
+                       EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
+                       this.Endpoints.Add (ed);
+                       ed.InitializeServiceEndpoint (false, serviceType, se);
+                       return ed;
                }
 
                public string BindingName {
@@ -182,6 +231,9 @@ namespace System.ServiceModel.Dispatcher
                protected internal override void Attach (ServiceHostBase host)
                {
                        this.host = host;
+                       var bl = listener as IChannelDispatcherBoundListener;
+                       if (bl != null)
+                               bl.ChannelDispatcher = this;
                }
 
                public override void CloseInput ()
@@ -216,7 +268,7 @@ namespace System.ServiceModel.Dispatcher
                        AsyncCallback callback, object state)
                {
                        if (open_delegate == null)
-                               open_delegate = new Action<TimeSpan> (OnClose);
+                               open_delegate = new Action<TimeSpan> (OnOpen);
                        return open_delegate.BeginInvoke (timeout, callback, state);
                }
 
@@ -248,67 +300,60 @@ namespace System.ServiceModel.Dispatcher
                        if (Host == null || MessageVersion == null)
                                throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
 
-                       loop_manager = new ListenerLoopManager (this, timeout);
+                       loop_manager.Setup (timeout);
                }
 
-               [MonoTODO ("what to do here?")]
                protected override void OnOpening ()
                {
+                       base.OnOpening ();
+                       loop_manager = new ListenerLoopManager (this);
                }
 
                protected override void OnOpened ()
                {
-                       loop_manager.Setup ();
+                       base.OnOpened ();
+                       StartLoop ();
                }
 
-               internal void StartLoop ()
+               void StartLoop ()
                {
                        // FIXME: not sure if it should be filled here.
                        if (ServiceThrottle == null)
-                               ServiceThrottle = new ServiceThrottle ();
+                               ServiceThrottle = new ServiceThrottle (this);
 
                        loop_manager.Start ();
                }
+       }
 
-               bool IsMessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
-               {
-                       Uri to = req.Headers.To;
-                       if (to == null)
-                               return false;
-                       if (to.AbsoluteUri == Constants.WsaAnonymousUri)
-                               return false;
-                       return endpoint.AddressFilter.Match (req) && endpoint.ContractFilter.Match (req);
-               }
-                
-               void HandleError (Exception ex)
-               {
-                       foreach (IErrorHandler handler in ErrorHandlers)
-                               if (handler.HandleError (ex))
-                                       break;
-               }
-
+               // isolated from ChannelDispatcher
                class ListenerLoopManager
                {
                        ChannelDispatcher owner;
-                       AutoResetEvent handle = new AutoResetEvent (false);
+                       AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
                        AutoResetEvent creator_handle = new AutoResetEvent (false);
                        ManualResetEvent stop_handle = new ManualResetEvent (false);
                        bool loop;
                        Thread loop_thread;
-                       TimeSpan open_timeout;
+                       DateTime close_started;
+                       TimeSpan close_timeout;
                        Func<IAsyncResult> channel_acceptor;
                        List<IChannel> channels = new List<IChannel> ();
+                       AddressFilterMode address_filter_mode;
+                       List<ISession> sessions = new List<ISession> ();
 
-                       public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
+                       public ListenerLoopManager (ChannelDispatcher owner)
                        {
                                this.owner = owner;
-                               open_timeout = openTimeout;
+                               var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
+                               if (sba != null)
+                                       address_filter_mode = sba.AddressFilterMode;
                        }
 
-                       public void Setup ()
+                       public void Setup (TimeSpan openTimeout)
                        {
-                               if (owner.Listener.State != CommunicationState.Opened)
-                                       owner.Listener.Open (open_timeout);
+                               if (owner.Listener.State != CommunicationState.Created)
+                                       throw new InvalidOperationException ("Tried to open the channel listener which is bound to ChannelDispatcher, but it is not at Created state");
+                               owner.Listener.Open (openTimeout);
 
                                // It is tested at Open(), but strangely it is not instantiated at this point.
                                foreach (var ed in owner.Endpoints)
@@ -319,10 +364,6 @@ namespace System.ServiceModel.Dispatcher
 
                        public void Start ()
                        {
-                               foreach (var ed in owner.Endpoints)
-                                       if (ed.DispatchRuntime.InstanceContextProvider == null)
-                                               ed.DispatchRuntime.InstanceContextProvider = new DefaultInstanceContextProvider ();
-
                                if (loop_thread == null)
                                        loop_thread = new Thread (new ThreadStart (Loop));
                                loop_thread.Start ();
@@ -337,16 +378,15 @@ namespace System.ServiceModel.Dispatcher
                                        try {
                                                ChannelAccepted (r.EndAcceptChannel (result));
                                        } catch (Exception ex) {
-                                               Console.WriteLine ("Exception during finishing channel acceptance.");
-                                               Console.WriteLine (ex);
+                                               Logger.Error ("Exception during finishing channel acceptance.", ex);
+                                               creator_handle.Set ();
                                        }
                                };
                                return delegate {
                                        try {
                                                return r.BeginAcceptChannel (callback, null);
                                        } catch (Exception ex) {
-                                               Console.WriteLine ("Exception during accepting channel.");
-                                               Console.WriteLine (ex);
+                                               Logger.Error ("Exception during accepting channel.", ex);
                                                throw;
                                        }
                                };
@@ -368,28 +408,67 @@ namespace System.ServiceModel.Dispatcher
 
                        public void Stop (TimeSpan timeout)
                        {
+                               if (loop_thread == null)
+                                       return;
+
+                               close_started = DateTime.Now;
+                               close_timeout = timeout;
                                loop = false;
                                creator_handle.Set ();
-                               handle.Set ();
+                               throttle_wait_handle.Set (); // break primary loop
                                if (stop_handle != null) {
                                        stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
                                        stop_handle.Close ();
                                        stop_handle = null;
                                }
-                               if (owner.Listener.State != CommunicationState.Closed)
+                               if (owner.Listener.State != CommunicationState.Closed) {
+                                       Logger.Warning (String.Format ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ()));
                                        owner.Listener.Abort ();
+                               }
                                if (loop_thread != null && loop_thread.IsAlive)
                                        loop_thread.Abort ();
                                loop_thread = null;
                        }
 
+                       void AddChannel (IChannel ch)
+                       {
+                               channels.Add (ch);
+                               var ich = ch as ISessionChannel<IInputSession>;
+                               if (ich != null && ich.Session != null) {
+                                       lock (sessions) {
+                                               var session = sessions.FirstOrDefault (s => s.Id == ich.Session.Id);
+                                               if (session == null)
+                                                       sessions.Add (session);
+                                       }
+                               }
+                       }
+
+                       void RemoveChannel (IChannel ch)
+                       {
+                               channels.Remove (ch); // zonbie, if exists
+                               var ich = ch as ISessionChannel<IInputSession>;
+                               
+                               if (ich != null && ich.Session != null)
+                                       sessions.Remove (ich.Session);
+                       }
+
                        public void CloseInput ()
                        {
                                foreach (var ch in channels.ToArray ()) {
-                                       if (ch.State == CommunicationState.Closed)
-                                               channels.Remove (ch); // zonbie, if exists
-                                       else
-                                               ch.Close ();
+                                       if (ch.State == CommunicationState.Closed) {
+                                               lock (channels) {
+                                                       RemoveChannel (ch);
+                                               }
+                                       }
+                                       else {
+                                               try {
+                                                       ch.Close (close_timeout - (DateTime.Now - close_started));
+                                               } catch (Exception ex) {
+                                                       // FIXME: log it.
+                                                       Logger.Error (String.Format ("Exception on closing channel ({0})", ch.GetType ()), ex);
+                                                       ch.Abort ();
+                                               }
+                                       }
                                }
                        }
 
@@ -398,9 +477,7 @@ namespace System.ServiceModel.Dispatcher
                                try {
                                        LoopCore ();
                                } catch (Exception ex) {
-                                       // FIXME: log it
-                                       Console.WriteLine ("ChannelDispatcher caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
-                                       Console.WriteLine (ex);
+                                       Logger.Error (String.Format ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener), ex);
                                } finally {
                                        if (stop_handle != null)
                                                stop_handle.Set ();
@@ -413,26 +490,27 @@ namespace System.ServiceModel.Dispatcher
 
                                // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
                                // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
-                               
-                               //FIXME: The logic here should be somewhat different as follows:
-                               //1. Get the message
-                               //2. Get the appropriate EndPointDispatcher that can handle the message
-                               //   which is done using the filters (AddressFilter, ContractFilter).
-                               //3. Let the appropriate endpoint handle the request.
 
                                while (loop) {
-                                       while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
+                                       // FIXME: take MaxConcurrentCalls into consideration appropriately.
+                                       while (loop && channels.Count < Math.Min (owner.ServiceThrottle.MaxConcurrentSessions, owner.ServiceThrottle.MaxConcurrentCalls)) {
+                                               // FIXME: this should not be required, but saves multi-ChannelDispatcher case (Throttling enabled) for HTTP standalone listener...
+                                               Thread.Sleep (100);
                                                channel_acceptor ();
                                                creator_handle.WaitOne (); // released by ChannelAccepted()
-                                               creator_handle.Reset ();
                                        }
                                        if (!loop)
                                                break;
-                                       handle.WaitOne (); // released by IChannel.Close()
-                                       handle.Reset ();
+                                       throttle_wait_handle.WaitOne (); // released by IChannel.Close()
+                               }
+                               try {
+                                       owner.Listener.Close ();
+                               } catch (Exception ex) {
+                                       Logger.Error (String.Format ("Exception while closing IChannelListener ({0})", owner.Listener.GetType ()), ex);
+                               } finally {
+                                       // make sure to close both listener and channels.
+                                       owner.CloseInput ();
                                }
-                               owner.Listener.Close ();
-                               owner.CloseInput ();
                        }
 
                        void ChannelAccepted (IChannel ch)
@@ -447,17 +525,20 @@ namespace System.ServiceModel.Dispatcher
                                        return;
                                }
 
-                               channels.Add (ch);
+                               lock (channels)
+                                       AddChannel (ch);
                                ch.Opened += delegate {
                                        ch.Faulted += delegate {
-                                               if (channels.Contains (ch))
-                                                       channels.Remove (ch);
-                                               handle.Set (); // release loop wait lock.
+                                               lock (channels)
+                                                       if (channels.Contains (ch))
+                                                               RemoveChannel (ch);
+                                               throttle_wait_handle.Set (); // release loop wait lock.
                                                };
                                        ch.Closed += delegate {
-                                               if (channels.Contains (ch))
-                                                       channels.Remove (ch);
-                                               handle.Set (); // release loop wait lock.
+                                               lock (channels)
+                                                       if (channels.Contains (ch))
+                                                               RemoveChannel (ch);
+                                               throttle_wait_handle.Set (); // release loop wait lock.
                                                };
                                        };
                                ch.Open ();
@@ -498,79 +579,114 @@ namespace System.ServiceModel.Dispatcher
                                var reply = (IReplyChannel) result.AsyncState;
                                if (reply.EndTryReceiveRequest (result, out rc))
                                        ProcessRequest (reply, rc);
+                               else
+                                       reply.Close ();
                        }
 
                        void TryReceiveDone (IAsyncResult result)
                        {
                                Message msg;
                                var input = (IInputChannel) result.AsyncState;
-                               if (input.EndTryReceive (result, out msg))
-                                       ProcessInput (input, msg);
-                       }
-
-                       void SendEndpointNotFound (RequestContext rc, EndpointNotFoundException ex) 
-                       {
                                try {
-
-                                       MessageVersion version = rc.RequestMessage.Version;
-                                       FaultCode fc = new FaultCode ("DestinationUnreachable", version.Addressing.Namespace);
-                                       Message res = Message.CreateMessage (version, fc, "error occured", rc.RequestMessage.Headers.Action);
-                                       rc.Reply (res);
+                                       if (input.EndTryReceive (result, out msg))
+                                               ProcessInput (input, msg);
+                                       else
+                                               input.Close ();
+                               } catch (ObjectDisposedException) {
+                                       input.Close ();
                                }
-                               catch (Exception e) { }
                        }
 
                        void ProcessRequest (IReplyChannel reply, RequestContext rc)
                        {
                                try {
-                                       EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
-                                       new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
-                                               ProcessReply (rc);
-                               } catch (EndpointNotFoundException ex) {
-                                       SendEndpointNotFound (rc, ex);
+                                       var req = rc.RequestMessage;
+                                       var ed = FindEndpointDispatcher (req);
+                                       new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
                                } catch (Exception ex) {
-                                       // FIXME: log it.
-                                       Console.WriteLine (ex);
+                                       Message res;
+                                       if (ProcessErrorWithHandlers (reply, ex, out res))
+                                               return;
+
+                                       if ((!(ex is SocketException)) && 
+                                           (!(ex is XmlException)) &&
+                                           (!(ex is IOException)))
+                                               rc.Reply (res);
+                                       
+                                       reply.Close (owner.DefaultCloseTimeout); // close the channel
                                } finally {
+                                       if (rc != null)
+                                               rc.Close ();
                                        // unless it is closed by session/call manager, move it back to the loop to receive the next message.
-                                       if (reply.State != CommunicationState.Closed)
+                                       if (loop && reply.State != CommunicationState.Closed)
                                                ProcessRequestOrInput (reply);
                                }
                        }
 
+                       bool ProcessErrorWithHandlers (IChannel ch, Exception ex, out Message res)
+                       {
+                               res = null;
+
+                               foreach (var eh in owner.ErrorHandlers)
+                                       if (eh.HandleError (ex))
+                                               return true; // error is handled appropriately.
+
+                               Logger.Error ("An error occured, to be handled", ex);
+
+                               foreach (var eh in owner.ErrorHandlers)
+                                       eh.ProvideFault (ex, owner.MessageVersion, ref res);
+                               if (res == null) {
+                                       var conv = ch.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (owner.MessageVersion);
+                                       if (!conv.TryCreateFaultMessage (ex, out res))
+                                               res = Message.CreateMessage (owner.MessageVersion, new FaultCode ("Receiver"), ex.Message, owner.MessageVersion.Addressing.FaultNamespace);
+                               }
+
+                               return false;
+                       }
+
                        void ProcessInput (IInputChannel input, Message message)
                        {
                                try {
                                        EndpointDispatcher candidate = null;
                                        candidate = FindEndpointDispatcher (message);
-                                       new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
-                                               ProcessInput (message);
+                                       new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).ProcessInput (message);
                                }
                                catch (Exception ex) {
-                                       // FIXME: log it.
-                                       Console.WriteLine (ex);
+                                       Message dummy;
+                                       ProcessErrorWithHandlers (input, ex, out dummy);
                                } finally {
                                        // unless it is closed by session/call manager, move it back to the loop to receive the next message.
-                                       if (input.State != CommunicationState.Closed)
+                                       if (loop && input.State != CommunicationState.Closed)
                                                ProcessRequestOrInput (input);
                                }
                        }
 
                        EndpointDispatcher FindEndpointDispatcher (Message message) {
                                EndpointDispatcher candidate = null;
-                               for (int i = 0; i < owner.Endpoints.Count; i++) {
-                                       if (owner.IsMessageMatchesEndpointDispatcher (message, owner.Endpoints [i])) {
-                                               var newdis = owner.Endpoints [i];
+                               bool hasEndpointMatch = false;
+                               foreach (var endpoint in owner.Endpoints) {
+                                       if (endpoint.AddressFilter.Match (message)) {
+                                               hasEndpointMatch = true;
+                                               if (!endpoint.ContractFilter.Match (message))
+                                                       continue;
+                                               var newdis = endpoint;
                                                if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
                                                        candidate = newdis;
                                                else if (candidate.FilterPriority == newdis.FilterPriority)
                                                        throw new MultipleFilterMatchesException ();
                                        }
                                }
-                               if (candidate == null)
-                                       throw new EndpointNotFoundException (String.Format ("The request message has the target '{0}' with action '{1}' which is not reachable in this service contract", message.Headers.To, message.Headers.Action));
+                               if (candidate == null && !hasEndpointMatch) {
+                                       if (owner.Host != null)
+                                               owner.Host.OnUnknownMessageReceived (message);
+                                       // we have to return a fault to the client anyways...
+                                       throw new EndpointNotFoundException ();
+                               }
+                               else if (candidate == null)
+                                       // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
+                                       throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));
+
                                return candidate;
                        }
                }
-       }
 }