Implemented SendAsync()
authorPetr Onderka <gsvick@gmail.com>
Mon, 16 Jul 2012 20:43:03 +0000 (22:43 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 21:59:17 +0000 (23:59 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs

index f639ff614c0863edeeda0fa7ea529f94b6bf3748..b2631996d7e621454bb846809f0473b03557a01d 100644 (file)
@@ -46,6 +46,7 @@
    <Compile Include="..\..\build\common\Locale.cs" />
    <Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
    <Compile Include="Assembly\AssemblyInfo.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\SendBlock.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\DataflowLinkOptions.cs" />\r
index 61755878b21b4796cd221e7251598687d131b603..77b34e4006ff1395b82e9cc9551dff3559765b39 100644 (file)
@@ -41,3 +41,4 @@ System.Threading.Tasks.Dataflow/ReceiveBlock.cs
 System.Threading.Tasks.Dataflow/TransformBlock.cs
 System.Threading.Tasks.Dataflow/TransformManyBlock.cs
 System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
+System.Threading.Tasks.Dataflow/SendBlock.cs
index b8e248b76a692c84a4d223f47a76fc0f1ebf29ae..a245357555c82b132b329129a18b6d2c30b50988 100644 (file)
@@ -23,8 +23,6 @@
 
 namespace System.Threading.Tasks.Dataflow {
        public static class DataflowBlock {
-               static DataflowMessageHeader GlobalHeader = new DataflowMessageHeader ();
-
                public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
                {
                        if (source == null)
@@ -158,7 +156,8 @@ namespace System.Threading.Tasks.Dataflow {
                        if (target == null)
                                throw new ArgumentNullException ("target");
 
-                       return target.OfferMessage (GlobalHeader.Increment (), item, null, false) == DataflowMessageStatus.Accepted;
+                       return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
+                              == DataflowMessageStatus.Accepted;
                }
 
                public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
@@ -247,11 +246,32 @@ namespace System.Threading.Tasks.Dataflow {
                        return source.TryReceive (null, out item);
                }
 
-               [MonoTODO]
-               public static Task<bool> SendAsync<TInput> (this ITargetBlock<TInput> target, TInput item)
+               public static Task<bool> SendAsync<TInput> (
+                       this ITargetBlock<TInput> target, TInput item)
                {
-                       throw new NotImplementedException ();
+                       return SendAsync (target, item, CancellationToken.None);
                }
-       }
-}
 
+               public static Task<bool> SendAsync<TInput> (
+                       this ITargetBlock<TInput> target, TInput item,
+                       CancellationToken cancellationToken)
+               {
+                       if (target == null)
+                               throw new ArgumentNullException ("target");
+
+                       cancellationToken.ThrowIfCancellationRequested ();
+
+                       var status = target.OfferMessage (
+                               new DataflowMessageHeader (1), item, null, false);
+
+                       if (status == DataflowMessageStatus.Accepted)
+                               return Task.FromResult (true);
+                       if (status != DataflowMessageStatus.Declined
+                           && status != DataflowMessageStatus.Postponed)
+                               return Task.FromResult (false);
+
+                       var block = new SendBlock<TInput> (target, item, cancellationToken);
+                       return block.Send ();
+               }
+       }
+}
\ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs
new file mode 100644 (file)
index 0000000..78f6c9c
--- /dev/null
@@ -0,0 +1,170 @@
+// SendBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// This block is used in <see cref="DataflowBlock.SendAsync"/>
+       /// to asynchronously wait until a single item is sent to a given target.
+       /// </summary>
+       class SendBlock<T> : ISourceBlock<T> {
+               readonly ITargetBlock<T> sendTarget;
+               readonly T item;
+               CancellationToken cancellationToken;
+               readonly TaskCompletionSource<bool> taskCompletionSource =
+                       new TaskCompletionSource<bool> ();
+               readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
+               CancellationTokenRegistration cancellationTokenRegistration;
+
+               bool isReserved;
+
+               volatile bool cancelDisabled;
+
+               public SendBlock (ITargetBlock<T> sendTarget, T item,
+                                 CancellationToken cancellationToken)
+               {
+                       this.sendTarget = sendTarget;
+                       this.item = item;
+                       this.cancellationToken = cancellationToken;
+               }
+
+               public Task<bool> Send ()
+               {
+                       cancellationTokenRegistration = cancellationToken.Register (
+                               () =>
+                               {
+                                       if (!cancelDisabled)
+                                               taskCompletionSource.SetCanceled ();
+                               });
+
+                       PerformSend ();
+
+                       return taskCompletionSource.Task;
+               }
+
+               void PerformSend ()
+               {
+                       DisableCancel ();
+
+                       if (taskCompletionSource.Task.IsCanceled)
+                               return;
+
+                       var status = sendTarget.OfferMessage (sendHeader, item, this, false);
+
+                       if (status == DataflowMessageStatus.Accepted)
+                               SetResult (true);
+                       else if (status != DataflowMessageStatus.Postponed)
+                               SetResult (false);
+                       else
+                               EnableCancel ();
+               }
+
+               public Task Completion {
+                       get { throw new NotSupportedException (); }
+               }
+
+               public void Complete ()
+               {
+                       throw new NotSupportedException ();
+               }
+
+               public void Fault (Exception exception)
+               {
+                       throw new NotSupportedException ();
+               }
+
+               public T ConsumeMessage (DataflowMessageHeader messageHeader,
+                                        ITargetBlock<T> target, out bool messageConsumed)
+               {
+                       if (!messageHeader.IsValid)
+                               throw new ArgumentException ("The messageHeader is not valid.",
+                                       "messageHeader");
+                       if (target == null)
+                               throw new ArgumentNullException("target");
+
+                       DisableCancel ();
+
+                       messageConsumed = false;
+
+                       if (taskCompletionSource.Task.IsCanceled)
+                               return default(T);
+
+                       if (messageHeader != sendHeader || target != sendTarget) {
+                               EnableCancel ();
+                               return default(T);
+                       }
+
+                       SetResult (true);
+
+                       messageConsumed = true;
+                       return item;
+               }
+
+               public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+               {
+                       throw new NotSupportedException ();
+               }
+
+               public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               {
+                       if (messageHeader != sendHeader || target != sendTarget || !isReserved)
+                               throw new InvalidOperationException (
+                                       "The target did not have the message reserved.");
+
+                       isReserved = false;
+                       EnableCancel ();
+                       PerformSend ();
+               }
+
+               public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               {
+                       DisableCancel ();
+
+                       if (messageHeader == sendHeader && target == sendTarget) {
+                               isReserved = true;
+                               return true;
+                       }
+
+                       EnableCancel ();
+
+                       return false;
+               }
+
+               void DisableCancel ()
+               {
+                       cancelDisabled = true;
+               }
+
+               void EnableCancel ()
+               {
+                       cancelDisabled = false;
+
+                       if (cancellationToken.IsCancellationRequested)
+                               taskCompletionSource.SetCanceled ();
+               }
+
+               void SetResult (bool result)
+               {
+                       cancellationTokenRegistration.Dispose ();
+                       taskCompletionSource.SetResult (result);
+               }
+       }
+}
\ No newline at end of file
index 67461ac27222dbbd44ad536830f398d51ddb65bf..b473ded8d5cf67b2c9304c5d9632aed650dc8277 100644 (file)
@@ -3,8 +3,10 @@
 //  
 // Author:
 //       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+//       Petr Onderka <gsvick@gmail.com>
 // 
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 // 
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -25,7 +27,6 @@
 // THE SOFTWARE.
 
 using System;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
@@ -179,5 +180,62 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
                        Assert.IsTrue (task.IsCompleted);
                        Assert.AreEqual (TaskStatus.Canceled, task.Status);
                }
+
+               [Test]
+               public void SendAsyncAcceptedTest ()
+               {
+                       var target = new BufferBlock<int> ();
+                       var task = target.SendAsync (1);
+
+                       Assert.IsTrue (task.Wait (0));
+                       Assert.IsTrue (task.Result);
+               }
+
+               [Test]
+               public void SendAsyncDeclinedTest ()
+               {
+                       var target = new BufferBlock<int> ();
+                       target.Complete ();
+                       var task = target.SendAsync (1);
+
+                       Assert.IsTrue (task.Wait (0));
+                       Assert.IsFalse (task.Result);
+               }
+
+               [Test]
+               public void SendAsyncPostponedAcceptedTest ()
+               {
+                       var target =
+                               new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+                       Assert.IsTrue (target.Post (1));
+
+                       var task = target.SendAsync (1);
+
+                       Assert.IsFalse (task.Wait (100));
+
+                       Assert.AreEqual (1, target.Receive ());
+
+                       Assert.IsTrue (task.Wait (100));
+                       Assert.IsTrue (task.Result);
+               }
+
+               [Test]
+               public void SendAsyncPostponedDeclinedTest ()
+               {
+                       var target =
+                               new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+                       Assert.IsTrue (target.Post (1));
+
+                       var task = target.SendAsync (1);
+
+                       Assert.IsFalse (task.Wait (100));
+
+                       target.Complete ();
+
+                       Assert.IsTrue (task.Wait (100));
+                       Assert.IsFalse (task.Result);
+               }
        }
-}
+}
\ No newline at end of file
index 330bf66527f0684a52e75c7ff9215ab319a4bf7d..57a46125408a6bbbe34cdb97c7d2c1a60b88b9fe 100644 (file)
@@ -22,6 +22,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 using NUnit.Framework;
@@ -193,6 +194,52 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 
                        Assert.AreEqual (43, target.DirectlyAccepted);
                }
+
+               [Test]
+               public void FaultConsume()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source =
+                               new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new TestTargetBlock<int> { Postpone = true };
+                       Assert.IsNotNull (source.LinkTo (target));
+
+                       Assert.IsTrue (source.Post (42));
+                       scheduler.ExecuteAll ();
+                       Assert.IsTrue (target.HasPostponed);
+
+                       ((IDataflowBlock)source).Fault (new Exception ());
+
+                       scheduler.ExecuteAll ();
+                       Thread.Sleep (100);
+
+                       int value;
+                       Assert.IsFalse (target.RetryPostponed (out value));
+               }
+
+               [Test]
+               public void ReserveFaultConsume()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source =
+                               new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new TestTargetBlock<int> { Postpone = true };
+                       Assert.IsNotNull (source.LinkTo (target));
+
+                       Assert.IsTrue (source.Post (42));
+                       scheduler.ExecuteAll ();
+                       Assert.IsTrue (target.HasPostponed);
+
+                       Assert.IsTrue (target.ReservePostponed ());
+
+                       ((IDataflowBlock)source).Fault (new Exception ());
+
+                       scheduler.ExecuteAll ();
+                       Thread.Sleep (100);
+
+                       int value;
+                       Assert.IsTrue (target.RetryPostponed (out value));
+               }
        }
 
        class TestTargetBlock<T> : ITargetBlock<T> {