using System.Collections.Generic;
using System.IO;
using System.Net;
+using System.Net.Sockets;
using System.Net.Security;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Security;
+using System.Threading;
+using System.Xml;
namespace System.ServiceModel.Channels
{
- internal abstract class ReplyChannelBase : ChannelBase, IReplyChannel
+ internal abstract class InternalReplyChannelBase : ReplyChannelBase
{
- ChannelListenerBase channel_listener;
+ public InternalReplyChannelBase (ChannelListenerBase listener)
+ : base (listener)
+ {
+ local_address = new EndpointAddress (listener.Uri);
+ }
+
+ EndpointAddress local_address;
+
+ public override EndpointAddress LocalAddress {
+ get { return local_address; }
+ }
+ }
+ internal abstract class ReplyChannelBase : ChannelBase, IReplyChannel
+ {
public ReplyChannelBase (ChannelListenerBase listener)
: base (listener)
{
- this.channel_listener = listener;
+ this.listener = listener;
+ }
+
+ ChannelListenerBase listener;
+
+ public ChannelListenerBase Listener {
+ get { return listener; }
}
public abstract EndpointAddress LocalAddress { get; }
+ public override T GetProperty<T> ()
+ {
+ if (typeof (T) == typeof (MessageVersion) && listener is IHasMessageEncoder)
+ return (T) (object) ((IHasMessageEncoder) listener).MessageEncoder.MessageVersion;
+ if (typeof (T) == typeof (IChannelListener))
+ return (T) (object) listener;
+ return base.GetProperty<T> ();
+ }
+
+ // FIXME: this is wrong. Implement all of them in each channel.
+ protected override void OnAbort ()
+ {
+ OnClose (TimeSpan.Zero);
+ }
+
+ protected override void OnClose (TimeSpan timeout)
+ {
+ if (currentAsyncThreads.Count > 0)
+ if (!CancelAsync (timeout))
+ foreach (Thread asyncThread in currentAsyncThreads)
+ asyncThread.Abort ();
+ }
+
+ public virtual bool CancelAsync (TimeSpan timeout)
+ {
+ // FIXME: It should wait for the actual completion.
+ return currentAsyncResults.Count > 0;
+ //return CurrentAsyncResult == null || CurrentAsyncResult.AsyncWaitHandle.WaitOne (timeout);
+ }
+
public virtual bool TryReceiveRequest ()
{
RequestContext dummy;
- return TryReceiveRequest (channel_listener.DefaultReceiveTimeout, out dummy);
+ return TryReceiveRequest (DefaultReceiveTimeout, out dummy);
}
public abstract bool TryReceiveRequest (TimeSpan timeout, out RequestContext context);
delegate bool TryReceiveDelegate (TimeSpan timeout, out RequestContext context);
TryReceiveDelegate try_recv_delegate;
+ object async_result_lock = new object ();
+ HashSet<Thread> currentAsyncThreads = new HashSet<Thread>();
+ HashSet<IAsyncResult> currentAsyncResults = new HashSet<IAsyncResult>();
+
public virtual IAsyncResult BeginTryReceiveRequest (TimeSpan timeout, AsyncCallback callback, object state)
{
+ IAsyncResult result = null;
+
if (try_recv_delegate == null)
- try_recv_delegate = new TryReceiveDelegate (TryReceiveRequest);
+ try_recv_delegate = new TryReceiveDelegate (delegate (TimeSpan tout, out RequestContext ctx) {
+ lock (async_result_lock) {
+ if (currentAsyncResults.Contains (result))
+ currentAsyncThreads.Add (Thread.CurrentThread);
+ }
+ try {
+ return TryReceiveRequest (tout, out ctx);
+ } catch (XmlException ex) {
+ Console.WriteLine ("Xml Exception (Dropped Connection?):" + ex.Message);
+ //on dropped connection,
+ //whatever you do don't crash
+ //the whole app. Ignore for now
+ } catch (SocketException ex) {
+ Console.WriteLine ("Socket Exception (Dropped Connection?):" + ex.Message);
+ //on dropped connection,
+ //whatever you do don't crash
+ //the whole app. Ignore for now
+ } catch (IOException ex) {
+ Console.WriteLine ("I/O Exception (Dropped Connection?):" + ex.Message);
+ //on dropped connection,
+ //whatever you do don't crash
+ //the whole app. Ignore for now
+ } finally {
+ lock (async_result_lock) {
+ currentAsyncResults.Remove (result);
+ currentAsyncThreads.Remove (Thread.CurrentThread);
+ }
+ }
+ ctx = null;
+ return false;
+ });
RequestContext dummy;
- return try_recv_delegate.BeginInvoke (timeout, out dummy, callback, state);
+ lock (async_result_lock) {
+ result = try_recv_delegate.BeginInvoke (timeout, out dummy, callback, state);
+ currentAsyncResults.Add (result);
+ }
+ // Note that at this point result can be missing from currentAsyncResults here if delegate has run to completion
+ return result;
}
public virtual bool EndTryReceiveRequest (IAsyncResult result)
public virtual bool WaitForRequest ()
{
- return WaitForRequest (channel_listener.DefaultReceiveTimeout);
+ return WaitForRequest (DefaultReceiveTimeout);
}
public abstract bool WaitForRequest (TimeSpan timeout);
- delegate bool WaitDelegate (TimeSpan timeout);
- WaitDelegate wait_delegate;
+ Func<TimeSpan,bool> wait_delegate;
public virtual IAsyncResult BeginWaitForRequest (TimeSpan timeout, AsyncCallback callback, object state)
{
if (wait_delegate == null)
- wait_delegate = new WaitDelegate (WaitForRequest);
+ wait_delegate = new Func<TimeSpan,bool> (WaitForRequest);
return wait_delegate.BeginInvoke (timeout, callback, state);
}
public virtual RequestContext ReceiveRequest ()
{
- return ReceiveRequest (channel_listener.DefaultReceiveTimeout);
+ return ReceiveRequest (DefaultReceiveTimeout);
}
public abstract RequestContext ReceiveRequest (TimeSpan timeout);
public virtual IAsyncResult BeginReceiveRequest (AsyncCallback callback, object state)
{
- return BeginReceiveRequest (channel_listener.DefaultReceiveTimeout, callback, state);
+ return BeginReceiveRequest (DefaultReceiveTimeout, callback, state);
}
Func<TimeSpan,RequestContext> recv_delegate;
return recv_delegate.EndInvoke (result);
}
+ Action<TimeSpan> open_delegate, close_delegate;
+
protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
AsyncCallback callback, object state)
{
- throw new NotImplementedException ();
+ if (open_delegate == null)
+ open_delegate = new Action<TimeSpan> (OnOpen);
+ return open_delegate.BeginInvoke (timeout, callback, state);
}
protected override void OnEndOpen (IAsyncResult result)
{
- throw new NotImplementedException ();
+ if (open_delegate == null)
+ throw new InvalidOperationException ("async open operation has not started");
+ open_delegate.EndInvoke (result);
}
- [MonoTODO]
protected override IAsyncResult OnBeginClose (TimeSpan timeout,
AsyncCallback callback, object state)
{
- throw new NotImplementedException ();
+ if (close_delegate == null)
+ close_delegate = new Action<TimeSpan> (OnClose);
+ return close_delegate.BeginInvoke (timeout, callback, state);
}
- [MonoTODO]
protected override void OnEndClose (IAsyncResult result)
{
- throw new NotImplementedException ();
+ if (close_delegate == null)
+ throw new InvalidOperationException ("async close operation has not started");
+ close_delegate.EndInvoke (result);
}
}
}