<Compile Include="..\..\build\common\Locale.cs" />
<Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
<Compile Include="Assembly\AssemblyInfo.cs" />
+ <Compile Include="System.Threading.Tasks.Dataflow\SendBlock.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\DataflowLinkOptions.cs" />\r
System.Threading.Tasks.Dataflow/TransformBlock.cs
System.Threading.Tasks.Dataflow/TransformManyBlock.cs
System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
+System.Threading.Tasks.Dataflow/SendBlock.cs
namespace System.Threading.Tasks.Dataflow {
public static class DataflowBlock {
- static DataflowMessageHeader GlobalHeader = new DataflowMessageHeader ();
-
public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
{
if (source == null)
if (target == null)
throw new ArgumentNullException ("target");
- return target.OfferMessage (GlobalHeader.Increment (), item, null, false) == DataflowMessageStatus.Accepted;
+ return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
+ == DataflowMessageStatus.Accepted;
}
public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
return source.TryReceive (null, out item);
}
- [MonoTODO]
- public static Task<bool> SendAsync<TInput> (this ITargetBlock<TInput> target, TInput item)
+ public static Task<bool> SendAsync<TInput> (
+ this ITargetBlock<TInput> target, TInput item)
{
- throw new NotImplementedException ();
+ return SendAsync (target, item, CancellationToken.None);
}
- }
-}
+ public static Task<bool> SendAsync<TInput> (
+ this ITargetBlock<TInput> target, TInput item,
+ CancellationToken cancellationToken)
+ {
+ if (target == null)
+ throw new ArgumentNullException ("target");
+
+ cancellationToken.ThrowIfCancellationRequested ();
+
+ var status = target.OfferMessage (
+ new DataflowMessageHeader (1), item, null, false);
+
+ if (status == DataflowMessageStatus.Accepted)
+ return Task.FromResult (true);
+ if (status != DataflowMessageStatus.Declined
+ && status != DataflowMessageStatus.Postponed)
+ return Task.FromResult (false);
+
+ var block = new SendBlock<TInput> (target, item, cancellationToken);
+ return block.Send ();
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+// SendBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// This block is used in <see cref="DataflowBlock.SendAsync"/>
+ /// to asynchronously wait until a single item is sent to a given target.
+ /// </summary>
+ class SendBlock<T> : ISourceBlock<T> {
+ readonly ITargetBlock<T> sendTarget;
+ readonly T item;
+ CancellationToken cancellationToken;
+ readonly TaskCompletionSource<bool> taskCompletionSource =
+ new TaskCompletionSource<bool> ();
+ readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
+ CancellationTokenRegistration cancellationTokenRegistration;
+
+ bool isReserved;
+
+ volatile bool cancelDisabled;
+
+ public SendBlock (ITargetBlock<T> sendTarget, T item,
+ CancellationToken cancellationToken)
+ {
+ this.sendTarget = sendTarget;
+ this.item = item;
+ this.cancellationToken = cancellationToken;
+ }
+
+ public Task<bool> Send ()
+ {
+ cancellationTokenRegistration = cancellationToken.Register (
+ () =>
+ {
+ if (!cancelDisabled)
+ taskCompletionSource.SetCanceled ();
+ });
+
+ PerformSend ();
+
+ return taskCompletionSource.Task;
+ }
+
+ void PerformSend ()
+ {
+ DisableCancel ();
+
+ if (taskCompletionSource.Task.IsCanceled)
+ return;
+
+ var status = sendTarget.OfferMessage (sendHeader, item, this, false);
+
+ if (status == DataflowMessageStatus.Accepted)
+ SetResult (true);
+ else if (status != DataflowMessageStatus.Postponed)
+ SetResult (false);
+ else
+ EnableCancel ();
+ }
+
+ public Task Completion {
+ get { throw new NotSupportedException (); }
+ }
+
+ public void Complete ()
+ {
+ throw new NotSupportedException ();
+ }
+
+ public void Fault (Exception exception)
+ {
+ throw new NotSupportedException ();
+ }
+
+ public T ConsumeMessage (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target, out bool messageConsumed)
+ {
+ if (!messageHeader.IsValid)
+ throw new ArgumentException ("The messageHeader is not valid.",
+ "messageHeader");
+ if (target == null)
+ throw new ArgumentNullException("target");
+
+ DisableCancel ();
+
+ messageConsumed = false;
+
+ if (taskCompletionSource.Task.IsCanceled)
+ return default(T);
+
+ if (messageHeader != sendHeader || target != sendTarget) {
+ EnableCancel ();
+ return default(T);
+ }
+
+ SetResult (true);
+
+ messageConsumed = true;
+ return item;
+ }
+
+ public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+ {
+ throw new NotSupportedException ();
+ }
+
+ public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ {
+ if (messageHeader != sendHeader || target != sendTarget || !isReserved)
+ throw new InvalidOperationException (
+ "The target did not have the message reserved.");
+
+ isReserved = false;
+ EnableCancel ();
+ PerformSend ();
+ }
+
+ public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ {
+ DisableCancel ();
+
+ if (messageHeader == sendHeader && target == sendTarget) {
+ isReserved = true;
+ return true;
+ }
+
+ EnableCancel ();
+
+ return false;
+ }
+
+ void DisableCancel ()
+ {
+ cancelDisabled = true;
+ }
+
+ void EnableCancel ()
+ {
+ cancelDisabled = false;
+
+ if (cancellationToken.IsCancellationRequested)
+ taskCompletionSource.SetCanceled ();
+ }
+
+ void SetResult (bool result)
+ {
+ cancellationTokenRegistration.Dispose ();
+ taskCompletionSource.SetResult (result);
+ }
+ }
+}
\ No newline at end of file
//
// Author:
// Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+// Petr Onderka <gsvick@gmail.com>
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// THE SOFTWARE.
using System;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Assert.IsTrue (task.IsCompleted);
Assert.AreEqual (TaskStatus.Canceled, task.Status);
}
+
+ [Test]
+ public void SendAsyncAcceptedTest ()
+ {
+ var target = new BufferBlock<int> ();
+ var task = target.SendAsync (1);
+
+ Assert.IsTrue (task.Wait (0));
+ Assert.IsTrue (task.Result);
+ }
+
+ [Test]
+ public void SendAsyncDeclinedTest ()
+ {
+ var target = new BufferBlock<int> ();
+ target.Complete ();
+ var task = target.SendAsync (1);
+
+ Assert.IsTrue (task.Wait (0));
+ Assert.IsFalse (task.Result);
+ }
+
+ [Test]
+ public void SendAsyncPostponedAcceptedTest ()
+ {
+ var target =
+ new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+ Assert.IsTrue (target.Post (1));
+
+ var task = target.SendAsync (1);
+
+ Assert.IsFalse (task.Wait (100));
+
+ Assert.AreEqual (1, target.Receive ());
+
+ Assert.IsTrue (task.Wait (100));
+ Assert.IsTrue (task.Result);
+ }
+
+ [Test]
+ public void SendAsyncPostponedDeclinedTest ()
+ {
+ var target =
+ new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+ Assert.IsTrue (target.Post (1));
+
+ var task = target.SendAsync (1);
+
+ Assert.IsFalse (task.Wait (100));
+
+ target.Complete ();
+
+ Assert.IsTrue (task.Wait (100));
+ Assert.IsFalse (task.Result);
+ }
}
-}
+}
\ No newline at end of file
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;
Assert.AreEqual (43, target.DirectlyAccepted);
}
+
+ [Test]
+ public void FaultConsume()
+ {
+ var scheduler = new TestScheduler ();
+ var source =
+ new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+ var target = new TestTargetBlock<int> { Postpone = true };
+ Assert.IsNotNull (source.LinkTo (target));
+
+ Assert.IsTrue (source.Post (42));
+ scheduler.ExecuteAll ();
+ Assert.IsTrue (target.HasPostponed);
+
+ ((IDataflowBlock)source).Fault (new Exception ());
+
+ scheduler.ExecuteAll ();
+ Thread.Sleep (100);
+
+ int value;
+ Assert.IsFalse (target.RetryPostponed (out value));
+ }
+
+ [Test]
+ public void ReserveFaultConsume()
+ {
+ var scheduler = new TestScheduler ();
+ var source =
+ new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+ var target = new TestTargetBlock<int> { Postpone = true };
+ Assert.IsNotNull (source.LinkTo (target));
+
+ Assert.IsTrue (source.Post (42));
+ scheduler.ExecuteAll ();
+ Assert.IsTrue (target.HasPostponed);
+
+ Assert.IsTrue (target.ReservePostponed ());
+
+ ((IDataflowBlock)source).Fault (new Exception ());
+
+ scheduler.ExecuteAll ();
+ Thread.Sleep (100);
+
+ int value;
+ Assert.IsTrue (target.RetryPostponed (out value));
+ }
}
class TestTargetBlock<T> : ITargetBlock<T> {