Merge pull request #463 from strawd/concurrent-requests
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / ReplyChannelBase.cs
index 0b286d6172cbb0757019cb92e1e5ba906c98ba68..4e47ecf0035d8fda5470806c3f5ad8686b0c21f7 100644 (file)
@@ -29,29 +29,81 @@ using System;
 using System.Collections.Generic;
 using System.IO;
 using System.Net;
+using System.Net.Sockets;
 using System.Net.Security;
 using System.ServiceModel;
 using System.ServiceModel.Description;
 using System.ServiceModel.Security;
+using System.Threading;
+using System.Xml;
 
 namespace System.ServiceModel.Channels
 {
-       internal abstract class ReplyChannelBase : ChannelBase, IReplyChannel
+       internal abstract class InternalReplyChannelBase : ReplyChannelBase
        {
-               ChannelListenerBase channel_listener;
+               public InternalReplyChannelBase (ChannelListenerBase listener)
+                       : base (listener)
+               {
+                       local_address = new EndpointAddress (listener.Uri);
+               }
+
+               EndpointAddress local_address;
+
+               public override EndpointAddress LocalAddress {
+                       get { return local_address; }
+               }
+       }
 
+       internal abstract class ReplyChannelBase : ChannelBase, IReplyChannel
+       {
                public ReplyChannelBase (ChannelListenerBase listener)
                        : base (listener)
                {
-                       this.channel_listener = listener;
+                       this.listener = listener;
+               }
+
+               ChannelListenerBase listener;
+
+               public ChannelListenerBase Listener {
+                       get { return listener; }
                }
 
                public abstract EndpointAddress LocalAddress { get; }
 
+               public override T GetProperty<T> ()
+               {
+                       if (typeof (T) == typeof (MessageVersion) && listener is IHasMessageEncoder)
+                               return (T) (object) ((IHasMessageEncoder) listener).MessageEncoder.MessageVersion;
+                       if (typeof (T) == typeof (IChannelListener))
+                               return (T) (object) listener;
+                       return base.GetProperty<T> ();
+               }
+
+               // FIXME: this is wrong. Implement all of them in each channel.
+               protected override void OnAbort ()
+               {
+                       OnClose (TimeSpan.Zero);
+               }
+
+               protected override void OnClose (TimeSpan timeout)
+               {
+                       if (currentAsyncThreads.Count > 0)
+                               if (!CancelAsync (timeout))
+                                       foreach (Thread asyncThread in currentAsyncThreads)
+                                               asyncThread.Abort ();
+               }
+
+               public virtual bool CancelAsync (TimeSpan timeout)
+               {
+                       // FIXME: It should wait for the actual completion.
+                       return currentAsyncResults.Count > 0;
+                       //return CurrentAsyncResult == null || CurrentAsyncResult.AsyncWaitHandle.WaitOne (timeout);
+               }
+
                public virtual bool TryReceiveRequest ()
                {
                        RequestContext dummy;
-                       return TryReceiveRequest (channel_listener.DefaultReceiveTimeout, out dummy);
+                       return TryReceiveRequest (DefaultReceiveTimeout, out dummy);
                }
 
                public abstract bool TryReceiveRequest (TimeSpan timeout, out RequestContext context);
@@ -59,12 +111,53 @@ namespace System.ServiceModel.Channels
                delegate bool TryReceiveDelegate (TimeSpan timeout, out RequestContext context);
                TryReceiveDelegate try_recv_delegate;
 
+               object async_result_lock = new object ();
+               HashSet<Thread> currentAsyncThreads = new HashSet<Thread>();
+               HashSet<IAsyncResult> currentAsyncResults = new HashSet<IAsyncResult>();
+
                public virtual IAsyncResult BeginTryReceiveRequest (TimeSpan timeout, AsyncCallback callback, object state)
                {
+                       IAsyncResult result = null;
+
                        if (try_recv_delegate == null)
-                               try_recv_delegate = new TryReceiveDelegate (TryReceiveRequest);
+                               try_recv_delegate = new TryReceiveDelegate (delegate (TimeSpan tout, out RequestContext ctx) {
+                                       lock (async_result_lock) {
+                                               if (currentAsyncResults.Contains (result))
+                                                       currentAsyncThreads.Add (Thread.CurrentThread);
+                                       }
+                                       try {
+                                               return TryReceiveRequest (tout, out ctx);
+                                       } catch (XmlException ex) {
+                                               Console.WriteLine ("Xml Exception (Dropped Connection?):" + ex.Message);
+                                               //on dropped connection, 
+                                               //whatever you do don't crash
+                                               //the whole app.  Ignore for now
+                                       } catch (SocketException ex) {
+                                               Console.WriteLine ("Socket Exception (Dropped Connection?):" + ex.Message);
+                                               //on dropped connection, 
+                                               //whatever you do don't crash
+                                               //the whole app.  Ignore for now
+                                       } catch (IOException ex) {
+                                               Console.WriteLine ("I/O Exception (Dropped Connection?):" + ex.Message);
+                                               //on dropped connection, 
+                                               //whatever you do don't crash
+                                               //the whole app.  Ignore for now
+                                       } finally {
+                                               lock (async_result_lock) {
+                                                       currentAsyncResults.Remove (result);
+                                                       currentAsyncThreads.Remove (Thread.CurrentThread);
+                                               }
+                                       }
+                                       ctx = null;
+                                       return false;
+                                       });
                        RequestContext dummy;
-                       return try_recv_delegate.BeginInvoke (timeout, out dummy, callback, state);
+                       lock (async_result_lock) {
+                               result = try_recv_delegate.BeginInvoke (timeout, out dummy, callback, state);
+                               currentAsyncResults.Add (result);
+                       }
+                       // Note that at this point result can be missing from currentAsyncResults here if delegate has run to completion
+                       return result;
                }
 
                public virtual bool EndTryReceiveRequest (IAsyncResult result)
@@ -82,18 +175,17 @@ namespace System.ServiceModel.Channels
 
                public virtual bool WaitForRequest ()
                {
-                       return WaitForRequest (channel_listener.DefaultReceiveTimeout);
+                       return WaitForRequest (DefaultReceiveTimeout);
                }
 
                public abstract bool WaitForRequest (TimeSpan timeout);
 
-               delegate bool WaitDelegate (TimeSpan timeout);
-               WaitDelegate wait_delegate;
+               Func<TimeSpan,bool> wait_delegate;
 
                public virtual IAsyncResult BeginWaitForRequest (TimeSpan timeout, AsyncCallback callback, object state)
                {
                        if (wait_delegate == null)
-                               wait_delegate = new WaitDelegate (WaitForRequest);
+                               wait_delegate = new Func<TimeSpan,bool> (WaitForRequest);
                        return wait_delegate.BeginInvoke (timeout, callback, state);
                }
 
@@ -106,14 +198,14 @@ namespace System.ServiceModel.Channels
 
                public virtual RequestContext ReceiveRequest ()
                {
-                       return ReceiveRequest (channel_listener.DefaultReceiveTimeout);
+                       return ReceiveRequest (DefaultReceiveTimeout);
                }
 
                public abstract RequestContext ReceiveRequest (TimeSpan timeout);
 
                public virtual IAsyncResult BeginReceiveRequest (AsyncCallback callback, object state)
                {
-                       return BeginReceiveRequest (channel_listener.DefaultReceiveTimeout, callback, state);
+                       return BeginReceiveRequest (DefaultReceiveTimeout, callback, state);
                }
 
                Func<TimeSpan,RequestContext> recv_delegate;
@@ -131,28 +223,36 @@ namespace System.ServiceModel.Channels
                        return recv_delegate.EndInvoke (result);
                }
 
+               Action<TimeSpan> open_delegate, close_delegate;
+
                protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
                        AsyncCallback callback, object state)
                {
-                       throw new NotImplementedException ();
+                       if (open_delegate == null)
+                               open_delegate = new Action<TimeSpan> (OnOpen);
+                       return open_delegate.BeginInvoke (timeout, callback, state);
                }
 
                protected override void OnEndOpen (IAsyncResult result)
                {
-                       throw new NotImplementedException ();
+                       if (open_delegate == null)
+                               throw new InvalidOperationException ("async open operation has not started");
+                       open_delegate.EndInvoke (result);
                }
 
-               [MonoTODO]
                protected override IAsyncResult OnBeginClose (TimeSpan timeout,
                        AsyncCallback callback, object state)
                {
-                       throw new NotImplementedException ();
+                       if (close_delegate == null)
+                               close_delegate = new Action<TimeSpan> (OnClose);
+                       return close_delegate.BeginInvoke (timeout, callback, state);
                }
 
-               [MonoTODO]
                protected override void OnEndClose (IAsyncResult result)
                {
-                       throw new NotImplementedException ();
+                       if (close_delegate == null)
+                               throw new InvalidOperationException ("async close operation has not started");
+                       close_delegate.EndInvoke (result);
                }
        }
 }