using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
+using System.Linq;
using System.Reflection;
using System.ServiceModel.Channels;
using System.Threading;
using System.Transactions;
using System.ServiceModel;
using System.ServiceModel.Description;
+using System.Net.Sockets;
+using System.Xml;
+using System.IO;
namespace System.ServiceModel.Dispatcher
{
internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
{
- this.MessageVersion = se.Binding.MessageVersion;
- if (this.MessageVersion == null)
- this.MessageVersion = MessageVersion.Default;
-
//Attach one EndpointDispacher to the ChannelDispatcher
EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
this.Endpoints.Add (ed);
protected internal override void Attach (ServiceHostBase host)
{
this.host = host;
+ var bl = listener as IChannelDispatcherBoundListener;
+ if (bl != null)
+ bl.ChannelDispatcher = this;
}
public override void CloseInput ()
{
// FIXME: not sure if it should be filled here.
if (ServiceThrottle == null)
- ServiceThrottle = new ServiceThrottle ();
+ ServiceThrottle = new ServiceThrottle (this);
loop_manager.Start ();
}
Func<IAsyncResult> channel_acceptor;
List<IChannel> channels = new List<IChannel> ();
AddressFilterMode address_filter_mode;
+ List<ISession> sessions = new List<ISession> ();
public ListenerLoopManager (ChannelDispatcher owner)
{
public void Setup (TimeSpan openTimeout)
{
- if (owner.Listener.State != CommunicationState.Opened)
- owner.Listener.Open (openTimeout);
+ if (owner.Listener.State != CommunicationState.Created)
+ throw new InvalidOperationException ("Tried to open the channel listener which is bound to ChannelDispatcher, but it is not at Created state");
+ owner.Listener.Open (openTimeout);
// It is tested at Open(), but strangely it is not instantiated at this point.
foreach (var ed in owner.Endpoints)
try {
ChannelAccepted (r.EndAcceptChannel (result));
} catch (Exception ex) {
- Console.WriteLine ("Exception during finishing channel acceptance.");
- Console.WriteLine (ex);
+ Logger.Error ("Exception during finishing channel acceptance.", ex);
creator_handle.Set ();
}
};
try {
return r.BeginAcceptChannel (callback, null);
} catch (Exception ex) {
- Console.WriteLine ("Exception during accepting channel.");
- Console.WriteLine (ex);
+ Logger.Error ("Exception during accepting channel.", ex);
throw;
}
};
stop_handle = null;
}
if (owner.Listener.State != CommunicationState.Closed) {
- // FIXME: log it
- Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
+ Logger.Warning (String.Format ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ()));
owner.Listener.Abort ();
}
if (loop_thread != null && loop_thread.IsAlive)
loop_thread = null;
}
+ void AddChannel (IChannel ch)
+ {
+ channels.Add (ch);
+ var ich = ch as ISessionChannel<IInputSession>;
+ if (ich != null && ich.Session != null) {
+ lock (sessions) {
+ var session = sessions.FirstOrDefault (s => s.Id == ich.Session.Id);
+ if (session == null)
+ sessions.Add (session);
+ }
+ }
+ }
+
+ void RemoveChannel (IChannel ch)
+ {
+ channels.Remove (ch); // zonbie, if exists
+ var ich = ch as ISessionChannel<IInputSession>;
+
+ if (ich != null && ich.Session != null)
+ sessions.Remove (ich.Session);
+ }
+
public void CloseInput ()
{
foreach (var ch in channels.ToArray ()) {
- if (ch.State == CommunicationState.Closed)
- channels.Remove (ch); // zonbie, if exists
+ if (ch.State == CommunicationState.Closed) {
+ lock (channels) {
+ RemoveChannel (ch);
+ }
+ }
else {
try {
ch.Close (close_timeout - (DateTime.Now - close_started));
} catch (Exception ex) {
// FIXME: log it.
- Console.WriteLine (ex);
+ Logger.Error (String.Format ("Exception on closing channel ({0})", ch.GetType ()), ex);
ch.Abort ();
}
}
try {
LoopCore ();
} catch (Exception ex) {
- // FIXME: log it
- Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
- Console.WriteLine (ex);
+ Logger.Error (String.Format ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener), ex);
} finally {
if (stop_handle != null)
stop_handle.Set ();
// http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
while (loop) {
- // FIXME: enable throttling and allow more than one connection to process at a time.
- while (loop && channels.Count < 1) {
-// while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
+ // FIXME: take MaxConcurrentCalls into consideration appropriately.
+ while (loop && channels.Count < Math.Min (owner.ServiceThrottle.MaxConcurrentSessions, owner.ServiceThrottle.MaxConcurrentCalls)) {
+ // FIXME: this should not be required, but saves multi-ChannelDispatcher case (Throttling enabled) for HTTP standalone listener...
+ Thread.Sleep (100);
channel_acceptor ();
creator_handle.WaitOne (); // released by ChannelAccepted()
}
}
try {
owner.Listener.Close ();
+ } catch (Exception ex) {
+ Logger.Error (String.Format ("Exception while closing IChannelListener ({0})", owner.Listener.GetType ()), ex);
} finally {
// make sure to close both listener and channels.
owner.CloseInput ();
}
lock (channels)
- channels.Add (ch);
+ AddChannel (ch);
ch.Opened += delegate {
ch.Faulted += delegate {
lock (channels)
if (channels.Contains (ch))
- channels.Remove (ch);
+ RemoveChannel (ch);
throttle_wait_handle.Set (); // release loop wait lock.
};
ch.Closed += delegate {
lock (channels)
if (channels.Contains (ch))
- channels.Remove (ch);
+ RemoveChannel (ch);
throttle_wait_handle.Set (); // release loop wait lock.
};
};
{
Message msg;
var input = (IInputChannel) result.AsyncState;
- if (input.EndTryReceive (result, out msg))
- ProcessInput (input, msg);
- else
+ try {
+ if (input.EndTryReceive (result, out msg))
+ ProcessInput (input, msg);
+ else
+ input.Close ();
+ } catch (ObjectDisposedException) {
input.Close ();
+ }
}
void ProcessRequest (IReplyChannel reply, RequestContext rc)
{
- var req = rc.RequestMessage;
try {
+ var req = rc.RequestMessage;
var ed = FindEndpointDispatcher (req);
new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
} catch (Exception ex) {
- // FIXME: log it.
- Console.WriteLine (ex);
-
- var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
Message res;
- if (!conv.TryCreateFaultMessage (ex, out res))
- res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
- rc.Reply (res);
+ if (ProcessErrorWithHandlers (reply, ex, out res))
+ return;
+
+ if ((!(ex is SocketException)) &&
+ (!(ex is XmlException)) &&
+ (!(ex is IOException)))
+ rc.Reply (res);
+
+ reply.Close (owner.DefaultCloseTimeout); // close the channel
} finally {
if (rc != null)
rc.Close ();
}
}
+ bool ProcessErrorWithHandlers (IChannel ch, Exception ex, out Message res)
+ {
+ res = null;
+
+ foreach (var eh in owner.ErrorHandlers)
+ if (eh.HandleError (ex))
+ return true; // error is handled appropriately.
+
+ Logger.Error ("An error occured, to be handled", ex);
+
+ foreach (var eh in owner.ErrorHandlers)
+ eh.ProvideFault (ex, owner.MessageVersion, ref res);
+ if (res == null) {
+ var conv = ch.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (owner.MessageVersion);
+ if (!conv.TryCreateFaultMessage (ex, out res))
+ res = Message.CreateMessage (owner.MessageVersion, new FaultCode ("Receiver"), ex.Message, owner.MessageVersion.Addressing.FaultNamespace);
+ }
+
+ return false;
+ }
+
void ProcessInput (IInputChannel input, Message message)
{
try {
EndpointDispatcher candidate = null;
candidate = FindEndpointDispatcher (message);
- new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
- ProcessInput (message);
+ new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).ProcessInput (message);
}
catch (Exception ex) {
- // FIXME: log it.
- Console.WriteLine (ex);
+ Message dummy;
+ ProcessErrorWithHandlers (input, ex, out dummy);
} finally {
// unless it is closed by session/call manager, move it back to the loop to receive the next message.
if (loop && input.State != CommunicationState.Closed)