<Compile Include="System.Threading.Tasks.Dataflow\JoinTarget.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\MessageBox.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueue.cs" />\r
- <Compile Include="System.Threading.Tasks.Dataflow\MessageVault.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\NameHelper.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\PassingMessageBox.cs" />
- <Compile Include="System.Threading.Tasks.Dataflow\TargetBuffer.cs" />
<Compile Include="..\corlib\System.Threading\AtomicBoolean.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\ActionBlock.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\BatchBlock.cs" />
System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs
System.Threading.Tasks.Dataflow/OutgoingQueue.cs
System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
-System.Threading.Tasks.Dataflow/MessageVault.cs
System.Threading.Tasks.Dataflow/PassingMessageBox.cs
-System.Threading.Tasks.Dataflow/TargetBuffer.cs
System.Threading.Tasks.Dataflow/NameHelper.cs
System.Threading.Tasks.Dataflow/TargetCollection.cs
System.Threading.Tasks.Dataflow/JoinTarget.cs
public bool TryReceiveAll (out IList<T> items)
{
- T[] originalItems;
- bool received = outgoing.TryReceiveAll (out originalItems);
- if (received && cloner != null)
- items = Array.ConvertAll (originalItems, new Converter<T, T> (cloner));
- else
- items = originalItems;
+ T item;
+ if (!TryReceive (null, out item)) {
+ items = null;
+ return false;
+ }
- return received;
+ items = new[] { item };
+ return true;
}
void BroadcastProcess ()
}
}
+ public void DequeueItem()
+ {
+ T item;
+ if (Outgoing.TryTake (out item)) {
+ DecreaseCounts (item);
+ targets.SetCurrentItem (item);
+
+ CurrentItem = item;
+ }
+ }
+
protected override void Process ()
{
do {
ForceProcessing = false;
- T item;
- if (Outgoing.TryTake (out item)) {
- DecreaseCounts (item);
- targets.SetCurrentItem (item);
-
- CurrentItem = item;
- }
+ DequeueItem ();
targets.OfferItemToTargets ();
} while (!Store.IsEmpty || targets.NeedsProcessing);
return false;
}
-
- public bool TryReceiveAll (out T[] items)
- {
- T item;
- if (TryReceive (null, out item)) {
- items = new[] { item };
- return true;
- }
-
- items = null;
- return false;
- }
}
}
\ No newline at end of file
+++ /dev/null
-// MessageVault.cs
-//
-// Copyright (c) 2011 Jérémie "garuma" Laval
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-//
-
-
-using System;
-using System.Threading;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- /* MessageVault is used to store message value & header that have been proposed via OfferMessage
- * so that target block can then Consume them
- */
- internal class MessageVault<T>
- {
- class ReservationSlot
- {
- public T Data;
- public IDataflowBlock Reserved;
-
- public ReservationSlot (T data)
- {
- Data = data;
- Reserved = null;
- }
- }
-
- ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> store = new ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> ();
-
- public bool StoreMessage (DataflowMessageHeader header, T data)
- {
- if (!header.IsValid)
- throw new ArgumentException ("header", "Header is is not valid");
-
- return store.TryAdd (header, new ReservationSlot (data));
- }
-
- public T ConsumeMessage (DataflowMessageHeader header, IDataflowBlock target, out bool messageConsummed)
- {
- messageConsummed = false;
- if (!header.IsValid)
- throw new ArgumentException ("header", "Header is is not valid");
- if (target == null)
- throw new ArgumentNullException ("target");
-
- ReservationSlot slot;
- if (!store.TryRemove (header, out slot) || slot.Reserved != target)
- return default (T);
-
- messageConsummed = true;
- return slot.Data;
- }
-
- public bool ReserveMessage (DataflowMessageHeader header, IDataflowBlock target)
- {
- if (!header.IsValid)
- throw new ArgumentException ("header", "Header is is not valid");
- if (target == null)
- throw new ArgumentNullException ("target");
-
- ReservationSlot slot;
- if (!store.TryGetValue (header, out slot))
- return false;
-
- return Interlocked.CompareExchange (ref slot.Reserved, target, null) == null;
- }
-
- public void ReleaseReservation (DataflowMessageHeader header, IDataflowBlock target)
- {
- if (!header.IsValid)
- throw new ArgumentException ("header", "Header is is not valid");
- if (target == null)
- throw new ArgumentNullException ("target");
-
- ReservationSlot slot;
- if (!store.TryGetValue (header, out slot))
- return;
-
- if (Interlocked.CompareExchange (ref slot.Reserved, null, target) != target)
- throw new InvalidOperationException ("The target did not have the message reserved");
- }
- }
-}
-
+++ /dev/null
-// TargetBuffer.cs
-//
-// Copyright (c) 2011 Jérémie "garuma" Laval
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-//
-
-
-using System;
-using System.Linq;
-using System.Threading.Tasks;
-using System.Collections;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- internal class TargetBuffer<T> : IEnumerable<ITargetBlock<T>>
- {
- ConcurrentQueue<TargetWaiter> targetWaiters = new ConcurrentQueue<TargetWaiter> ();
-
- class TargetWaiter : IDisposable
- {
- public volatile bool Disabled;
- public readonly ITargetBlock<T> Target;
- public readonly bool UnlinkAfterOne;
-
- ConcurrentQueue<TargetWaiter> queue;
- AtomicBooleanValue removed;
-
- public TargetWaiter (ITargetBlock<T> target, bool unlinkAfterOne, ConcurrentQueue<TargetWaiter> queue)
- {
- Target = target;
- UnlinkAfterOne = unlinkAfterOne;
- this.queue = queue;
- }
-
- public void Dispose ()
- {
- TargetWaiter t;
- Disabled = true;
-
- Thread.MemoryBarrier ();
-
- if (queue.TryPeek (out t) && t == this && removed.TryRelaxedSet ()) {
- queue.TryDequeue (out t);
- } else {
- SpinWait wait = new SpinWait ();
- while (queue.TryPeek (out t) && t == this)
- wait.SpinOnce ();
- }
- }
- }
-
- public IDisposable AddTarget (ITargetBlock<T> target, bool unlinkAfterOne)
- {
- if (target == null)
- throw new ArgumentNullException("target");
-
- TargetWaiter w = new TargetWaiter (target, unlinkAfterOne, targetWaiters);
- targetWaiters.Enqueue (w);
-
- return w;
- }
-
- public ITargetBlock<T> Current {
- get {
- TargetWaiter w;
-
- while (true) {
- if (!targetWaiters.TryPeek (out w))
- return null;
-
- if (w.Disabled == true) {
- w.Dispose ();
- continue;
- } else if (w.UnlinkAfterOne) {
- w.Dispose ();
- }
-
- return w.Target;
- }
- }
- }
-
- public IEnumerator<ITargetBlock<T>> GetEnumerator ()
- {
- return targetWaiters.Select (w => w.Target).GetEnumerator ();
- }
-
- IEnumerator IEnumerable.GetEnumerator ()
- {
- return targetWaiters.Select (w => w.Target).GetEnumerator ();
- }
- }
-}
-
// WriteOnceBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
using System.Collections.Generic;
using System.Collections.Concurrent;
-namespace System.Threading.Tasks.Dataflow
-{
- public sealed class WriteOnceBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
- {
- static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
-
+namespace System.Threading.Tasks.Dataflow {
+ public sealed class WriteOnceBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T> {
readonly CompletionHelper compHelper;
readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
readonly MessageBox<T> messageBox;
- readonly MessageVault<T> vault;
readonly DataflowBlockOptions dataflowBlockOptions;
readonly Func<T, T> cloner;
- TargetBuffer<T> targets = new TargetBuffer<T> ();
- DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
-
+ readonly BroadcastOutgoingQueue<T> outgoing;
AtomicBooleanValue written;
- bool ready;
- T finalValue;
- public WriteOnceBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+ public WriteOnceBlock (Func<T, T> cloner)
+ : this (cloner, DataflowBlockOptions.Default)
{
-
}
- public WriteOnceBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+ public WriteOnceBlock (Func<T, T> cloner,
+ DataflowBlockOptions dataflowBlockOptions)
{
if (dataflowBlockOptions == null)
throw new ArgumentNullException ("dataflowBlockOptions");
this.dataflowBlockOptions = dataflowBlockOptions;
this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
- () => true, _ => BroadcastProcess (), dataflowBlockOptions);
- this.vault = new MessageVault<T> ();
+ () => true, _ => BroadcastProcess (), dataflowBlockOptions,
+ canAccept: () => written.TrySet ());
+ this.outgoing = new BroadcastOutgoingQueue<T> (this, compHelper,
+ () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+ dataflowBlockOptions, cloner != null);
}
public DataflowMessageStatus OfferMessage (
DataflowMessageHeader messageHeader, T messageValue,
ISourceBlock<T> source, bool consumeToAccept)
{
- if (!messageHeader.IsValid)
- throw new ArgumentException ("The messageHeader is not valid.",
- "messageHeader");
- if (consumeToAccept && source == null)
- throw new ArgumentException (
- "consumeToAccept may only be true if provided with a non-null source.",
- "consumeToAccept");
-
- if (written.TryRelaxedSet ()) {
- Thread.MemoryBarrier ();
- finalValue = messageValue;
- Thread.MemoryBarrier ();
- ready = true;
- return messageBox.OfferMessage (messageHeader, finalValue, source, consumeToAccept);
- } else {
- return DataflowMessageStatus.DecliningPermanently;
- }
+ var result = messageBox.OfferMessage (messageHeader, messageValue, source,
+ consumeToAccept);
+ if (result == DataflowMessageStatus.Accepted)
+ messageQueue.CompleteAdding ();
+ return result;
}
- public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+ public IDisposable LinkTo (ITargetBlock<T> target,
+ DataflowLinkOptions linkOptions)
{
- if (linkOptions == null)
- throw new ArgumentNullException("linkOptions");
-
- return targets.AddTarget (target, true);
+ return outgoing.AddTarget (target, linkOptions);
}
- public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+ public T ConsumeMessage (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target, out bool messageConsumed)
{
- return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+ T message = outgoing.ConsumeMessage (
+ messageHeader, target, out messageConsumed);
+ if (messageConsumed && cloner != null)
+ message = cloner (message);
+ return message;
}
- public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ public void ReleaseReservation (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target)
{
- vault.ReleaseReservation (messageHeader, target);
+ outgoing.ReleaseReservation (messageHeader, target);
}
- public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ public bool ReserveMessage (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target)
{
- return vault.ReserveMessage (messageHeader, target);
+ return outgoing.ReserveMessage (messageHeader, target);
}
public bool TryReceive (Predicate<T> filter, out T item)
{
- item = default (T);
- if (!written.Value)
- return false;
-
- if (!ready) {
- SpinWait spin = new SpinWait ();
- while (!ready)
- spin.SpinOnce ();
- }
-
- if (filter == null || filter (finalValue)) {
- item = cloner != null ? cloner (finalValue) : finalValue;
- return true;
- }
-
- return false;
+ var received = outgoing.TryReceive (filter, out item);
+ if (received && cloner != null)
+ item = cloner (item);
+ return received;
}
public bool TryReceiveAll (out IList<T> items)
{
- items = null;
- if (!written.Value)
- return false;
-
T item;
- if (!TryReceive (null, out item))
+ if (!TryReceive (null, out item)) {
+ items = null;
return false;
+ }
- items = new T[] { item };
+ items = new[] { item };
return true;
}
void BroadcastProcess ()
{
- T input;
-
- if (!messageQueue.TryTake (out input))
- return;
-
- foreach (var target in targets) {
- DataflowMessageHeader header = headers.Increment ();
- if (cloner != null)
- vault.StoreMessage (header, input);
- target.OfferMessage (header, input, this, cloner != null);
- }
+ T item;
+ if (messageQueue.TryTake (out item))
+ outgoing.AddData (item);
+ outgoing.DequeueItem ();
}
public void Complete ()
{
messageBox.Complete ();
+ outgoing.Complete ();
}
public void Fault (Exception exception)
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
public override string ToString ()
-//
// WriteOnceBlockTest.cs
//
// Author:
// Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+// Petr Onderka <gsvick@gmail.com>
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// THE SOFTWARE.
using System;
-using System.Linq;
using System.Threading;
-using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading.Tasks.Dataflow;
-
using NUnit.Framework;
-namespace MonoTests.System.Threading.Tasks.Dataflow
-{
+namespace MonoTests.System.Threading.Tasks.Dataflow {
[TestFixture]
- public class WriteOnceBlockTest
- {
+ public class WriteOnceBlockTest {
[Test]
public void BasicUsageTest ()
{
bool act1 = false, act2 = false;
var evt = new CountdownEvent (2);
- var broadcast = new WriteOnceBlock<int> (null);
- var action1 = new ActionBlock<int> (i => { act1 = i == 42; evt.Signal (); });
- var action2 = new ActionBlock<int> (i => { act2 = i == 42; evt.Signal (); });
+ var block = new WriteOnceBlock<int> (null);
+ var action1 = new ActionBlock<int> (i =>
+ {
+ act1 = i == 42;
+ evt.Signal ();
+ });
+ var action2 = new ActionBlock<int> (i =>
+ {
+ act2 = i == 42;
+ evt.Signal ();
+ });
- broadcast.LinkTo (action1);
- broadcast.LinkTo (action2);
+ block.LinkTo (action1);
+ block.LinkTo (action2);
- Assert.IsTrue (broadcast.Post (42));
+ Assert.IsTrue (block.Post (42));
+ Assert.IsFalse (block.Post (43));
- evt.Wait ();
+ Assert.IsTrue (evt.Wait (100));
Assert.IsTrue (act1);
Assert.IsTrue (act2);
}
+ [Test]
+ public void LinkAfterPostTest ()
+ {
+ bool act = false;
+ var evt = new ManualResetEventSlim ();
+
+ var block = new WriteOnceBlock<int> (null);
+ var action = new ActionBlock<int> (i =>
+ {
+ act = i == 42;
+ evt.Set ();
+ });
+
+ Assert.IsTrue (block.Post (42));
+
+ block.LinkTo (action);
+
+ Assert.IsTrue (evt.Wait (100));
+
+ Assert.IsTrue (act);
+ }
+
+ [Test]
+ public void PostponedTest ()
+ {
+ var block = new WriteOnceBlock<int> (null);
+ var target = new BufferBlock<int> (
+ new DataflowBlockOptions { BoundedCapacity = 1 });
+ block.LinkTo (target);
+
+ Assert.IsTrue (target.Post (1));
+
+ Assert.IsTrue (block.Post (2));
+
+ Assert.AreEqual (1, target.Receive (TimeSpan.FromMilliseconds (100)));
+ Assert.AreEqual (2, target.Receive (TimeSpan.FromMilliseconds (100)));
+ }
+
+ [Test]
+ public void QueuedMessageTest ()
+ {
+ var scheduler = new TestScheduler ();
+ var block = new WriteOnceBlock<int> (null,
+ new DataflowBlockOptions { TaskScheduler = scheduler });
+ var target = new BufferBlock<int> ();
+ block.LinkTo (target);
+
+ Assert.IsTrue (block.Post (1));
+
+ AssertEx.Throws<TimeoutException> (
+ () => target.Receive (TimeSpan.FromMilliseconds (100)));
+
+ scheduler.ExecuteAll ();
+
+ int item;
+ Assert.IsTrue (target.TryReceive (out item));
+ Assert.AreEqual (1, item);
+ }
+
[Test]
public void CloningTest ()
{
var evt = new CountdownEvent (2);
object source = new object ();
- var broadcast = new WriteOnceBlock<object> (o => new object ());
- var action1 = new ActionBlock<object> (i => { act1 = i; evt.Signal (); });
- var action2 = new ActionBlock<object> (i => { act2 = i; evt.Signal (); });
+ var block = new WriteOnceBlock<object> (o => new object ());
+ var action1 = new ActionBlock<object> (i =>
+ {
+ act1 = i;
+ evt.Signal ();
+ });
+ var action2 = new ActionBlock<object> (i =>
+ {
+ act2 = i;
+ evt.Signal ();
+ });
- broadcast.LinkTo (action1);
- broadcast.LinkTo (action2);
+ block.LinkTo (action1);
+ block.LinkTo (action2);
- Assert.IsTrue (broadcast.Post (source));
+ Assert.IsTrue (block.Post (source));
- evt.Wait ();
+ Assert.IsTrue (evt.Wait (100));
Assert.IsNotNull (act1);
Assert.IsNotNull (act2);
var evt = new CountdownEvent (2);
var broadcast = new WriteOnceBlock<int> (null);
- var action1 = new ActionBlock<int> (i => { act1 = i == 42; evt.Signal (); });
- var action2 = new ActionBlock<int> (i => { act2 = i == 42; evt.Signal (); });
+ var action1 = new ActionBlock<int> (i =>
+ {
+ act1 = i == 42;
+ evt.Signal ();
+ });
+ var action2 = new ActionBlock<int> (i =>
+ {
+ act2 = i == 42;
+ evt.Signal ();
+ });
broadcast.LinkTo (action1);
broadcast.LinkTo (action2);
Assert.IsTrue (broadcast.Post (42));
- evt.Wait ();
+ Assert.IsTrue (evt.Wait (100));
Assert.IsTrue (act1);
Assert.IsTrue (act2);
Assert.IsFalse (broadcast.Post (24));
- Thread.Sleep (1600);
+ Thread.Sleep (300);
Assert.IsTrue (act1);
Assert.IsTrue (act2);
}
[Test]
- public void TryReceiveBehavior ()
+ public void TryReceiveBehaviorTest ()
{
var block = new WriteOnceBlock<int> (null);
int foo;
Assert.IsFalse (block.TryReceive (i => i == 0, out foo));
IList<int> bar;
Assert.IsTrue (((IReceivableSourceBlock<int>)block).TryReceiveAll (out bar));
- Assert.IsNotNull (bar);
- Assert.AreEqual (1, bar.Count);
- Assert.AreEqual (42, bar[0]);
+ CollectionAssert.AreEqual (new[] { 42 }, bar);
+ }
+
+ [Test]
+ public void DontOfferTwiceTest ()
+ {
+ var scheduler = new TestScheduler ();
+ var block = new WriteOnceBlock<int> (null,
+ new DataflowBlockOptions { TaskScheduler = scheduler });
+ var target =
+ new TestTargetBlock<int> { Postpone = true };
+ block.LinkTo (target);
+
+ Assert.IsFalse (target.HasPostponed);
+
+ Assert.IsTrue (block.Post (1));
+
+ scheduler.ExecuteAll ();
+
+ Assert.IsTrue (target.HasPostponed);
+
+ target.Postpone = false;
+
+ int value;
+ Assert.IsTrue (target.RetryPostponed (out value));
+ Assert.AreEqual (1, value);
+
+ block.LinkTo (new BufferBlock<int> ());
+
+ scheduler.ExecuteAll ();
+
+ Assert.AreEqual (default(int), target.DirectlyAccepted);
}
}
-}
+}
\ No newline at end of file