Corrected Receive() and ReceiveAsync()
authorPetr Onderka <gsvick@gmail.com>
Wed, 18 Jul 2012 20:30:58 +0000 (22:30 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 21:59:47 +0000 (23:59 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs

index a245357555c82b132b329129a18b6d2c30b50988..44dc58fe1ea4a44ab71b27793898e1d6b5523e97 100644 (file)
@@ -220,7 +220,9 @@ namespace System.Threading.Tasks.Dataflow {
                        return ReceiveAsync (source, timeout, CancellationToken.None);
                }
 
-               public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
+               public static Task<TOutput> ReceiveAsync<TOutput> (
+                       this ISourceBlock<TOutput> source, TimeSpan timeout,
+                       CancellationToken cancellationToken)
                {
                        if (source == null)
                                throw new ArgumentNullException ("source");
@@ -231,10 +233,10 @@ namespace System.Threading.Tasks.Dataflow {
 
                        cancellationToken.ThrowIfCancellationRequested ();
 
-                       long tm = (long)timeout.TotalMilliseconds;
-                       ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
+                       int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
+                       var block = new ReceiveBlock<TOutput> ();
                        var bridge = source.LinkTo (block);
-                       return block.AsyncGet (bridge, cancellationToken, tm);
+                       return block.AsyncGet (bridge, cancellationToken, timeoutMilliseconds);
                }
 
                public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
index e0b9dd3447b41d6fc3aae76827c279bc4a8b60d6..8a285022911affc660c56c03a7b39fba2070d3b3 100644 (file)
@@ -27,12 +27,11 @@ namespace System.Threading.Tasks.Dataflow {
        /// to retrieve elements in a blocking way
        /// </summary>
        internal class ReceiveBlock<TOutput> : ITargetBlock<TOutput> {
-               readonly ManualResetEventSlim waitHandle =
-                       new ManualResetEventSlim (false);
                readonly TaskCompletionSource<TOutput> completion =
                        new TaskCompletionSource<TOutput> ();
                IDisposable linkBridge;
-               volatile bool completed;
+               CancellationTokenRegistration cancellationRegistration;
+               Timer timeoutTimer;
 
                public DataflowMessageStatus OfferMessage (
                        DataflowMessageHeader messageHeader, TOutput messageValue,
@@ -41,70 +40,84 @@ namespace System.Threading.Tasks.Dataflow {
                        if (!messageHeader.IsValid)
                                return DataflowMessageStatus.Declined;
 
-                       if (consumeToAccept) {
-                               bool consummed;
-                               if (!source.ReserveMessage (messageHeader, this))
-                                       return DataflowMessageStatus.NotAvailable;
-                               messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
-                               if (!consummed)
-                                       return DataflowMessageStatus.NotAvailable;
-                       }
+                       if (completion.Task.Status != TaskStatus.WaitingForActivation)
+                               return DataflowMessageStatus.DecliningPermanently;
 
-                       ReceivedValue = messageValue;
-                       completion.TrySetResult (messageValue);
-                       Thread.MemoryBarrier ();
-                       waitHandle.Set ();
+                       lock (completion) {
+                               if (completion.Task.Status != TaskStatus.WaitingForActivation)
+                                       return DataflowMessageStatus.DecliningPermanently;
 
-                       // We do the unlinking here so that we don't get called twice
-                       if (linkBridge != null) {
-                               linkBridge.Dispose ();
-                               linkBridge = null;
+                               if (consumeToAccept) {
+                                       bool consummed;
+                                       if (!source.ReserveMessage (messageHeader, this))
+                                               return DataflowMessageStatus.NotAvailable;
+                                       messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
+                                       if (!consummed)
+                                               return DataflowMessageStatus.NotAvailable;
+                               }
+
+                               completion.TrySetResult (messageValue);
                        }
+                       CompletionSet ();
 
                        return DataflowMessageStatus.Accepted;
                }
 
                public TOutput WaitAndGet (IDisposable bridge, CancellationToken token, int timeout)
                {
-                       this.linkBridge = bridge;
-                       Wait (token, timeout);
-                       return ReceivedValue;
+                       try {
+                               return AsyncGet (bridge, token, timeout).Result;
+                       } catch (AggregateException e) {
+                               // resets the stack trace, but that shouldn't matter here
+                               throw e.InnerException;
+                       }
                }
 
-               public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, long timeout)
+               public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, int timeout)
                {
-                       this.linkBridge = bridge;
-                       token.Register (() => completion.TrySetCanceled ());
-                       // TODO : take care of timeout through the TaskEx.Wait thing
+                       linkBridge = bridge;
+                       cancellationRegistration = token.Register (() =>
+                       {
+                               lock (completion) {
+                                       completion.TrySetCanceled ();
+                               }
+                               CompletionSet ();
+                       });
+                       timeoutTimer = new Timer (
+                               _ =>
+                               {
+                                       lock (completion) {
+                                               completion.TrySetException (new TimeoutException ());
+                                       }
+                                       CompletionSet ();
+                               }, null, timeout,
+                               Timeout.Infinite);
+
                        return completion.Task;
                }
 
-               public void Wait (CancellationToken token, int timeout)
+               void CompletionSet ()
                {
-                       // Wait() throws correct cancellation exception by itself
-                       if (!waitHandle.Wait (timeout, token))
-                               throw new TimeoutException ();
-
-                       if (completed)
-                               throw new InvalidOperationException (
-                                       "No item could be received from the source.");
-               }
+                       if (linkBridge != null) {
+                               linkBridge.Dispose ();
+                               linkBridge = null;
+                       }
 
-               public TOutput ReceivedValue {
-                       get;
-                       private set;
+                       cancellationRegistration.Dispose ();
+                       timeoutTimer.Dispose ();
                }
 
                public Task Completion {
-                       get {
-                               return null;
-                       }
+                       get { throw new NotSupportedException (); }
                }
 
                public void Complete ()
                {
-                       completed = true;
-                       waitHandle.Set ();
+                       lock (completion) {
+                               completion.TrySetException (new InvalidOperationException (
+                                       "No item could be received from the source."));
+                       }
+                       CompletionSet ();
                }
 
                public void Fault (Exception exception)
index 57a46125408a6bbbe34cdb97c7d2c1a60b88b9fe..6bd5125571b2adade181efc6cc90574195d4ce5f 100644 (file)
@@ -196,7 +196,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                }
 
                [Test]
-               public void FaultConsume()
+               public void FaultConsume ()
                {
                        var scheduler = new TestScheduler ();
                        var source =
@@ -218,7 +218,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                }
 
                [Test]
-               public void ReserveFaultConsume()
+               public void ReserveFaultConsume ()
                {
                        var scheduler = new TestScheduler ();
                        var source =
@@ -240,6 +240,53 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                        int value;
                        Assert.IsTrue (target.RetryPostponed (out value));
                }
+
+               [Test]
+               public void PostAfterTimeout ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var block =
+                               new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+
+                       AssertEx.Throws<TimeoutException> (
+                               () => block.Receive (TimeSpan.FromMilliseconds (100)));
+
+                       block.Post (1);
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (block.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+               }
+
+               [Test]
+               public void PostAfterCancellation ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var block =
+                               new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+
+                       var tokenSource = new CancellationTokenSource ();
+
+                       Task.Factory.StartNew (
+                               () =>
+                               {
+                                       Thread.Sleep (100);
+                                       tokenSource.Cancel ();
+                               });
+
+                       AssertEx.Throws<OperationCanceledException> (
+                               () => block.Receive (tokenSource.Token));
+
+                       block.Post (1);
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (block.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+               }
        }
 
        class TestTargetBlock<T> : ITargetBlock<T> {