Collection<IErrorHandler> error_handlers
= new Collection<IErrorHandler> ();
IChannelListener listener;
- IDefaultCommunicationTimeouts timeouts;
+ internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
MessageVersion message_version;
bool receive_sync, include_exception_detail_in_faults,
manual_addressing, is_tx_receive;
EndpointListenerAsyncResult async_result;
ListenerLoopManager loop_manager;
- SynchronizedCollection<EndpointDispatcher> _endpoints;
+ SynchronizedCollection<EndpointDispatcher> endpoints;
[MonoTODO ("get binding info from config")]
- public ChannelDispatcher (IChannelListener listener)
+ public ChannelDispatcher (IChannelListener listener)
+ : this (listener, null)
{
- if (listener == null)
- throw new ArgumentNullException ("listener");
- Init (listener, null, null);
}
public ChannelDispatcher (
IChannelListener listener, string bindingName)
- : this (listener, bindingName,
- DefaultCommunicationTimeouts.Instance)
+ : this (listener, bindingName, null)
{
}
IChannelListener listener, string bindingName,
IDefaultCommunicationTimeouts timeouts)
{
- if (listener == null)
+ if (listener == null)
throw new ArgumentNullException ("listener");
- if (bindingName == null)
- throw new ArgumentNullException ("bindingName");
- if (timeouts == null)
- throw new ArgumentNullException ("timeouts");
Init (listener, bindingName, timeouts);
}
private void Init (IChannelListener listener, string bindingName,
- IDefaultCommunicationTimeouts timeouts) {
+ IDefaultCommunicationTimeouts timeouts)
+ {
this.listener = listener;
this.binding_name = bindingName;
- this.timeouts = timeouts;
- _endpoints = new SynchronizedCollection<EndpointDispatcher> ();
+ // IChannelListener is often a ChannelListenerBase
+ // which implements IDefaultCommunicationTimeouts.
+ this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
+ endpoints = new SynchronizedCollection<EndpointDispatcher> ();
}
public string BindingName {
}
public SynchronizedCollection<EndpointDispatcher> Endpoints {
- get {
- return _endpoints;
- }
+ get { return endpoints; }
}
[MonoTODO]
protected internal override void Attach (ServiceHostBase host)
{
this.host = host;
- }
+ }
public override void CloseInput ()
{
protected override void OnOpen (TimeSpan timeout)
{
- ProcessOpen (timeout);
+ if (Host == null || MessageVersion == null)
+ throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
+
+ // FIXME: hack, just to make it runnable
+ loop_manager = new ListenerLoopManager (this, timeout);
}
[MonoTODO ("what to do here?")]
{
}
- [MonoTODO ("what to do here?")]
protected override void OnOpened ()
{
+ ProcessOpened ();
}
void ProcessClose (TimeSpan timeout)
{
if (loop_manager != null)
- loop_manager.Stop ();
+ loop_manager.Stop (timeout);
CloseInput ();
}
- void ProcessOpen (TimeSpan timeout)
+ void ProcessOpened ()
{
- if (Host == null || MessageVersion == null)
- throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
try {
- // FIXME: hack, just to make it runnable
- listener.Open (timeout);
- loop_manager = new ListenerLoopManager (this);
- loop_manager.Start ();
+ loop_manager.Setup ();
} finally {
if (async_result != null)
async_result.Complete (false);
}
}
+ internal void StartLoop ()
+ {
+ loop_manager.Start ();
+ }
+
bool IsMessageMatchesEndpointDispatcher (Message req, EndpointDispatcher endpoint)
{
Uri to = req.Headers.To;
class ListenerLoopManager
{
+ delegate IChannel ChannelAcceptor ();
+
ChannelDispatcher owner;
AutoResetEvent handle;
IReplyChannel reply;
IInputChannel input;
bool loop;
Thread loop_thread;
+ TimeSpan open_timeout;
+ ChannelAcceptor channel_acceptor;
- public ListenerLoopManager (ChannelDispatcher owner)
+ public ListenerLoopManager (ChannelDispatcher owner, TimeSpan openTimeout)
{
this.owner = owner;
- MethodInfo mi = owner.Listener.GetType ().GetMethod ("AcceptChannel", Type.EmptyTypes);
- object channel = mi.Invoke (owner.Listener, new object [0]);
- reply = channel as IReplyChannel;
- input = channel as IInputChannel;
+ open_timeout = openTimeout;
+ }
+
+ public void Setup ()
+ {
+ if (owner.Listener.State != CommunicationState.Opened)
+ owner.Listener.Open (open_timeout);
+
+ // It is tested at Open(), but strangely it is not instantiated at this point.
+ foreach (var ed in owner.Endpoints)
+ if (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null)
+ throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
+ SetupChannel ();
}
public void Start ()
loop_thread.Start ();
}
- public void Stop ()
+ void SetupChannel ()
+ {
+ IChannelListener<IReplyChannel> r = owner.Listener as IChannelListener<IReplyChannel>;
+ if (r != null) {
+ channel_acceptor = delegate { return r.AcceptChannel (); };
+ return;
+ }
+ IChannelListener<IReplySessionChannel> rs = owner.Listener as IChannelListener<IReplySessionChannel>;
+ if (rs != null) {
+ channel_acceptor = delegate { return rs.AcceptChannel (); };
+ return;
+ }
+ IChannelListener<IInputChannel> i = owner.Listener as IChannelListener<IInputChannel>;
+ if (i != null) {
+ channel_acceptor = delegate { return i.AcceptChannel (); };
+ return;
+ }
+ IChannelListener<IInputSessionChannel> iss = owner.Listener as IChannelListener<IInputSessionChannel>;
+ if (iss != null) {
+ channel_acceptor = delegate { return iss.AcceptChannel (); };
+ return;
+ }
+ IChannelListener<IDuplexChannel> d = owner.Listener as IChannelListener<IDuplexChannel>;
+ if (d != null) {
+ channel_acceptor = delegate { return d.AcceptChannel (); };
+ return;
+ }
+ IChannelListener<IDuplexSessionChannel> ds = owner.Listener as IChannelListener<IDuplexSessionChannel>;
+ if (ds != null) {
+ channel_acceptor = delegate { return ds.AcceptChannel (); };
+ return;
+ }
+
+ throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", owner.Listener.GetType ()));
+ }
+
+ public void Stop (TimeSpan timeout)
{
StopLoop ();
owner.Listener.Close ();
// which is done using the filters (AddressFilter, ContractFilter).
//3. Let the appropriate endpoint handle the request.
+ IChannel ch = channel_acceptor ();
+ ch.Open (owner.timeouts.OpenTimeout);
+ reply = ch as IReplyChannel;
+ input = ch as IInputChannel;
+
if (reply != null) {
while (loop) {
if (reply.WaitForRequest (owner.timeouts.ReceiveTimeout))
throw new InvalidOperationException ("The reply channel didn't return RequestContext");
EndpointDispatcher candidate = FindEndpointDispatcher (rc.RequestMessage);
- new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply, owner.timeouts).
+ new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply).
ProcessReply (rc);
}
catch (EndpointNotFoundException ex) {
try {
candidate = FindEndpointDispatcher (message);
- new InputOrReplyRequestProcessor (candidate.DispatchRuntime, reply, owner.timeouts).
+ new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
ProcessInput(message);
}
catch (EndpointNotFoundException ex) {