Assert.AreEqual (DataflowMessageStatus.Postponed,
target.OfferMessage (header, 43, source, false));
- Assert.AreEqual (42, block.Receive (TimeSpan.FromMilliseconds (100)));
+ Assert.AreEqual (42, block.Receive (TimeSpan.FromMilliseconds (1000)));
Assert.IsFalse (block.Completion.Wait (100));
Assert.IsTrue (source.WasConsumed (header));
- Assert.AreEqual (43, block.Receive (TimeSpan.FromMilliseconds (100)));
+ Assert.AreEqual (43, block.Receive (TimeSpan.FromMilliseconds (1000)));
Assert.AreEqual (DataflowMessageStatus.Accepted,
target.OfferMessage (new DataflowMessageHeader (3), 44, source, false));
Assert.IsTrue (transform.Post (1));
Assert.IsTrue (transform.Post (2));
- Assert.GreaterOrEqual (scheduler.ExecuteAll (), 1);
+ AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
- Assert.AreEqual (2, Thread.VolatileRead (ref n));
+ Assert.AreEqual (2, Volatile.Read (ref n));
}
[Test]
Assert.IsTrue (transform.Post (1));
Assert.IsTrue (transform.Post (2));
- Assert.GreaterOrEqual (scheduler.ExecuteAll (), 1);
+ AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
- Assert.AreEqual (2, Thread.VolatileRead (ref n));
+ Assert.AreEqual (2, Volatile.Read (ref n));
}
int n;
Assert.IsFalse (transform.Post (101));
- Assert.GreaterOrEqual (scheduler.ExecuteAll (), 1);
+ AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
Assert.IsFalse (transform.Post (102));
- Assert.AreEqual (10000, Thread.VolatileRead (ref n));
+ Assert.AreEqual (10000, Volatile.Read (ref n));
}
IEnumerable<int> ComputeResults ()
readonly HashSet<DataflowMessageHeader> consumed =
new HashSet<DataflowMessageHeader> ();
+ readonly HashSet<DataflowMessageHeader> reserved =
+ new HashSet<DataflowMessageHeader> ();
public void Complete ()
{
return consumed.Contains (header);
}
+ public bool WasReserved (DataflowMessageHeader header)
+ {
+ return reserved.Contains (header);
+ }
+
+ public Action ConsumeWaiter { get; set; }
+
public T ConsumeMessage (DataflowMessageHeader messageHeader,
ITargetBlock<T> target, out bool messageConsumed)
{
T item;
if (messages.TryGetValue (messageHeader, out item)) {
+ if (ConsumeWaiter != null)
+ ConsumeWaiter ();
messages.Remove (messageHeader);
consumed.Add (messageHeader);
messageConsumed = true;
public bool ReserveMessage (DataflowMessageHeader messageHeader,
ITargetBlock<T> target)
{
+ reserved.Add (messageHeader);
return messages.ContainsKey (messageHeader);
}
throw new NotImplementedException ();
}
}
-}
\ No newline at end of file
+}