return ReceiveAsync (source, timeout, CancellationToken.None);
}
- public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
+ public static Task<TOutput> ReceiveAsync<TOutput> (
+ this ISourceBlock<TOutput> source, TimeSpan timeout,
+ CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException ("source");
cancellationToken.ThrowIfCancellationRequested ();
- long tm = (long)timeout.TotalMilliseconds;
- ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
+ int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
+ var block = new ReceiveBlock<TOutput> ();
var bridge = source.LinkTo (block);
- return block.AsyncGet (bridge, cancellationToken, tm);
+ return block.AsyncGet (bridge, cancellationToken, timeoutMilliseconds);
}
public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
/// to retrieve elements in a blocking way
/// </summary>
internal class ReceiveBlock<TOutput> : ITargetBlock<TOutput> {
- readonly ManualResetEventSlim waitHandle =
- new ManualResetEventSlim (false);
readonly TaskCompletionSource<TOutput> completion =
new TaskCompletionSource<TOutput> ();
IDisposable linkBridge;
- volatile bool completed;
+ CancellationTokenRegistration cancellationRegistration;
+ Timer timeoutTimer;
public DataflowMessageStatus OfferMessage (
DataflowMessageHeader messageHeader, TOutput messageValue,
if (!messageHeader.IsValid)
return DataflowMessageStatus.Declined;
- if (consumeToAccept) {
- bool consummed;
- if (!source.ReserveMessage (messageHeader, this))
- return DataflowMessageStatus.NotAvailable;
- messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
- if (!consummed)
- return DataflowMessageStatus.NotAvailable;
- }
+ if (completion.Task.Status != TaskStatus.WaitingForActivation)
+ return DataflowMessageStatus.DecliningPermanently;
- ReceivedValue = messageValue;
- completion.TrySetResult (messageValue);
- Thread.MemoryBarrier ();
- waitHandle.Set ();
+ lock (completion) {
+ if (completion.Task.Status != TaskStatus.WaitingForActivation)
+ return DataflowMessageStatus.DecliningPermanently;
- // We do the unlinking here so that we don't get called twice
- if (linkBridge != null) {
- linkBridge.Dispose ();
- linkBridge = null;
+ if (consumeToAccept) {
+ bool consummed;
+ if (!source.ReserveMessage (messageHeader, this))
+ return DataflowMessageStatus.NotAvailable;
+ messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
+ if (!consummed)
+ return DataflowMessageStatus.NotAvailable;
+ }
+
+ completion.TrySetResult (messageValue);
}
+ CompletionSet ();
return DataflowMessageStatus.Accepted;
}
public TOutput WaitAndGet (IDisposable bridge, CancellationToken token, int timeout)
{
- this.linkBridge = bridge;
- Wait (token, timeout);
- return ReceivedValue;
+ try {
+ return AsyncGet (bridge, token, timeout).Result;
+ } catch (AggregateException e) {
+ // resets the stack trace, but that shouldn't matter here
+ throw e.InnerException;
+ }
}
- public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, long timeout)
+ public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, int timeout)
{
- this.linkBridge = bridge;
- token.Register (() => completion.TrySetCanceled ());
- // TODO : take care of timeout through the TaskEx.Wait thing
+ linkBridge = bridge;
+ cancellationRegistration = token.Register (() =>
+ {
+ lock (completion) {
+ completion.TrySetCanceled ();
+ }
+ CompletionSet ();
+ });
+ timeoutTimer = new Timer (
+ _ =>
+ {
+ lock (completion) {
+ completion.TrySetException (new TimeoutException ());
+ }
+ CompletionSet ();
+ }, null, timeout,
+ Timeout.Infinite);
+
return completion.Task;
}
- public void Wait (CancellationToken token, int timeout)
+ void CompletionSet ()
{
- // Wait() throws correct cancellation exception by itself
- if (!waitHandle.Wait (timeout, token))
- throw new TimeoutException ();
-
- if (completed)
- throw new InvalidOperationException (
- "No item could be received from the source.");
- }
+ if (linkBridge != null) {
+ linkBridge.Dispose ();
+ linkBridge = null;
+ }
- public TOutput ReceivedValue {
- get;
- private set;
+ cancellationRegistration.Dispose ();
+ timeoutTimer.Dispose ();
}
public Task Completion {
- get {
- return null;
- }
+ get { throw new NotSupportedException (); }
}
public void Complete ()
{
- completed = true;
- waitHandle.Set ();
+ lock (completion) {
+ completion.TrySetException (new InvalidOperationException (
+ "No item could be received from the source."));
+ }
+ CompletionSet ();
}
public void Fault (Exception exception)
}
[Test]
- public void FaultConsume()
+ public void FaultConsume ()
{
var scheduler = new TestScheduler ();
var source =
}
[Test]
- public void ReserveFaultConsume()
+ public void ReserveFaultConsume ()
{
var scheduler = new TestScheduler ();
var source =
int value;
Assert.IsTrue (target.RetryPostponed (out value));
}
+
+ [Test]
+ public void PostAfterTimeout ()
+ {
+ var scheduler = new TestScheduler ();
+ var block =
+ new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+
+ AssertEx.Throws<TimeoutException> (
+ () => block.Receive (TimeSpan.FromMilliseconds (100)));
+
+ block.Post (1);
+
+ scheduler.ExecuteAll ();
+
+ int item;
+ Assert.IsTrue (block.TryReceive (out item));
+ Assert.AreEqual (1, item);
+ }
+
+ [Test]
+ public void PostAfterCancellation ()
+ {
+ var scheduler = new TestScheduler ();
+ var block =
+ new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+
+ var tokenSource = new CancellationTokenSource ();
+
+ Task.Factory.StartNew (
+ () =>
+ {
+ Thread.Sleep (100);
+ tokenSource.Cancel ();
+ });
+
+ AssertEx.Throws<OperationCanceledException> (
+ () => block.Receive (tokenSource.Token));
+
+ block.Post (1);
+
+ scheduler.ExecuteAll ();
+
+ int item;
+ Assert.IsTrue (block.TryReceive (out item));
+ Assert.AreEqual (1, item);
+ }
}
class TestTargetBlock<T> : ITargetBlock<T> {