From: Petr Onderka Date: Mon, 16 Jul 2012 20:43:03 +0000 (+0200) Subject: Implemented SendAsync() X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=commitdiff_plain;h=460864bd02707676db0eb884dbf9a2f2e7b97442;p=mono.git Implemented SendAsync() --- diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj index f639ff614c0..b2631996d7e 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj @@ -46,6 +46,7 @@ + diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources index 61755878b21..77b34e4006f 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources @@ -41,3 +41,4 @@ System.Threading.Tasks.Dataflow/ReceiveBlock.cs System.Threading.Tasks.Dataflow/TransformBlock.cs System.Threading.Tasks.Dataflow/TransformManyBlock.cs System.Threading.Tasks.Dataflow/WriteOnceBlock.cs +System.Threading.Tasks.Dataflow/SendBlock.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs index b8e248b76a6..a245357555c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs @@ -23,8 +23,6 @@ namespace System.Threading.Tasks.Dataflow { public static class DataflowBlock { - static DataflowMessageHeader GlobalHeader = new DataflowMessageHeader (); - public static IObservable AsObservable (this ISourceBlock source) { if (source == null) @@ -158,7 +156,8 @@ namespace System.Threading.Tasks.Dataflow { if (target == null) throw new ArgumentNullException ("target"); - return target.OfferMessage (GlobalHeader.Increment (), item, null, false) == DataflowMessageStatus.Accepted; + return target.OfferMessage (new DataflowMessageHeader(1), item, null, false) + == DataflowMessageStatus.Accepted; } public static TOutput Receive (this ISourceBlock source) @@ -247,11 +246,32 @@ namespace System.Threading.Tasks.Dataflow { return source.TryReceive (null, out item); } - [MonoTODO] - public static Task SendAsync (this ITargetBlock target, TInput item) + public static Task SendAsync ( + this ITargetBlock target, TInput item) { - throw new NotImplementedException (); + return SendAsync (target, item, CancellationToken.None); } - } -} + public static Task SendAsync ( + this ITargetBlock target, TInput item, + CancellationToken cancellationToken) + { + if (target == null) + throw new ArgumentNullException ("target"); + + cancellationToken.ThrowIfCancellationRequested (); + + var status = target.OfferMessage ( + new DataflowMessageHeader (1), item, null, false); + + if (status == DataflowMessageStatus.Accepted) + return Task.FromResult (true); + if (status != DataflowMessageStatus.Declined + && status != DataflowMessageStatus.Postponed) + return Task.FromResult (false); + + var block = new SendBlock (target, item, cancellationToken); + return block.Send (); + } + } +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs new file mode 100644 index 00000000000..78f6c9cf88f --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs @@ -0,0 +1,170 @@ +// SendBlock.cs +// +// Copyright (c) 2012 Petr Onderka +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +namespace System.Threading.Tasks.Dataflow { + /// + /// This block is used in + /// to asynchronously wait until a single item is sent to a given target. + /// + class SendBlock : ISourceBlock { + readonly ITargetBlock sendTarget; + readonly T item; + CancellationToken cancellationToken; + readonly TaskCompletionSource taskCompletionSource = + new TaskCompletionSource (); + readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1); + CancellationTokenRegistration cancellationTokenRegistration; + + bool isReserved; + + volatile bool cancelDisabled; + + public SendBlock (ITargetBlock sendTarget, T item, + CancellationToken cancellationToken) + { + this.sendTarget = sendTarget; + this.item = item; + this.cancellationToken = cancellationToken; + } + + public Task Send () + { + cancellationTokenRegistration = cancellationToken.Register ( + () => + { + if (!cancelDisabled) + taskCompletionSource.SetCanceled (); + }); + + PerformSend (); + + return taskCompletionSource.Task; + } + + void PerformSend () + { + DisableCancel (); + + if (taskCompletionSource.Task.IsCanceled) + return; + + var status = sendTarget.OfferMessage (sendHeader, item, this, false); + + if (status == DataflowMessageStatus.Accepted) + SetResult (true); + else if (status != DataflowMessageStatus.Postponed) + SetResult (false); + else + EnableCancel (); + } + + public Task Completion { + get { throw new NotSupportedException (); } + } + + public void Complete () + { + throw new NotSupportedException (); + } + + public void Fault (Exception exception) + { + throw new NotSupportedException (); + } + + public T ConsumeMessage (DataflowMessageHeader messageHeader, + ITargetBlock target, out bool messageConsumed) + { + if (!messageHeader.IsValid) + throw new ArgumentException ("The messageHeader is not valid.", + "messageHeader"); + if (target == null) + throw new ArgumentNullException("target"); + + DisableCancel (); + + messageConsumed = false; + + if (taskCompletionSource.Task.IsCanceled) + return default(T); + + if (messageHeader != sendHeader || target != sendTarget) { + EnableCancel (); + return default(T); + } + + SetResult (true); + + messageConsumed = true; + return item; + } + + public IDisposable LinkTo (ITargetBlock target, DataflowLinkOptions linkOptions) + { + throw new NotSupportedException (); + } + + public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock target) + { + if (messageHeader != sendHeader || target != sendTarget || !isReserved) + throw new InvalidOperationException ( + "The target did not have the message reserved."); + + isReserved = false; + EnableCancel (); + PerformSend (); + } + + public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock target) + { + DisableCancel (); + + if (messageHeader == sendHeader && target == sendTarget) { + isReserved = true; + return true; + } + + EnableCancel (); + + return false; + } + + void DisableCancel () + { + cancelDisabled = true; + } + + void EnableCancel () + { + cancelDisabled = false; + + if (cancellationToken.IsCancellationRequested) + taskCompletionSource.SetCanceled (); + } + + void SetResult (bool result) + { + cancellationTokenRegistration.Dispose (); + taskCompletionSource.SetResult (result); + } + } +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs index 67461ac2722..b473ded8d5c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs @@ -3,8 +3,10 @@ // // Author: // Jérémie "garuma" Laval +// Petr Onderka // // Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -25,7 +27,6 @@ // THE SOFTWARE. using System; -using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -179,5 +180,62 @@ namespace MonoTests.System.Threading.Tasks.Dataflow Assert.IsTrue (task.IsCompleted); Assert.AreEqual (TaskStatus.Canceled, task.Status); } + + [Test] + public void SendAsyncAcceptedTest () + { + var target = new BufferBlock (); + var task = target.SendAsync (1); + + Assert.IsTrue (task.Wait (0)); + Assert.IsTrue (task.Result); + } + + [Test] + public void SendAsyncDeclinedTest () + { + var target = new BufferBlock (); + target.Complete (); + var task = target.SendAsync (1); + + Assert.IsTrue (task.Wait (0)); + Assert.IsFalse (task.Result); + } + + [Test] + public void SendAsyncPostponedAcceptedTest () + { + var target = + new BufferBlock (new DataflowBlockOptions { BoundedCapacity = 1 }); + + Assert.IsTrue (target.Post (1)); + + var task = target.SendAsync (1); + + Assert.IsFalse (task.Wait (100)); + + Assert.AreEqual (1, target.Receive ()); + + Assert.IsTrue (task.Wait (100)); + Assert.IsTrue (task.Result); + } + + [Test] + public void SendAsyncPostponedDeclinedTest () + { + var target = + new BufferBlock (new DataflowBlockOptions { BoundedCapacity = 1 }); + + Assert.IsTrue (target.Post (1)); + + var task = target.SendAsync (1); + + Assert.IsFalse (task.Wait (100)); + + target.Complete (); + + Assert.IsTrue (task.Wait (100)); + Assert.IsFalse (task.Result); + } } -} +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs index 330bf66527f..57a46125408 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs @@ -22,6 +22,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using NUnit.Framework; @@ -193,6 +194,52 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { Assert.AreEqual (43, target.DirectlyAccepted); } + + [Test] + public void FaultConsume() + { + var scheduler = new TestScheduler (); + var source = + new BufferBlock (new DataflowBlockOptions { TaskScheduler = scheduler }); + var target = new TestTargetBlock { Postpone = true }; + Assert.IsNotNull (source.LinkTo (target)); + + Assert.IsTrue (source.Post (42)); + scheduler.ExecuteAll (); + Assert.IsTrue (target.HasPostponed); + + ((IDataflowBlock)source).Fault (new Exception ()); + + scheduler.ExecuteAll (); + Thread.Sleep (100); + + int value; + Assert.IsFalse (target.RetryPostponed (out value)); + } + + [Test] + public void ReserveFaultConsume() + { + var scheduler = new TestScheduler (); + var source = + new BufferBlock (new DataflowBlockOptions { TaskScheduler = scheduler }); + var target = new TestTargetBlock { Postpone = true }; + Assert.IsNotNull (source.LinkTo (target)); + + Assert.IsTrue (source.Post (42)); + scheduler.ExecuteAll (); + Assert.IsTrue (target.HasPostponed); + + Assert.IsTrue (target.ReservePostponed ()); + + ((IDataflowBlock)source).Fault (new Exception ()); + + scheduler.ExecuteAll (); + Thread.Sleep (100); + + int value; + Assert.IsTrue (target.RetryPostponed (out value)); + } } class TestTargetBlock : ITargetBlock {