// BroadcastBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
using System.Collections.Generic;
using System.Collections.Concurrent;
-namespace System.Threading.Tasks.Dataflow
-{
- public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
- {
- static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
-
- CompletionHelper compHelper;
- BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
- MessageBox<T> messageBox;
- MessageVault<T> vault;
- DataflowBlockOptions dataflowBlockOptions;
- readonly Func<T, T> cloner;
- MessageOutgoingQueue<T> outgoing;
- TargetBuffer<T> targets = new TargetBuffer<T> ();
- DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
-
- public BroadcastBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+namespace System.Threading.Tasks.Dataflow {
+ public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T> {
+ readonly CompletionHelper compHelper;
+ readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+ readonly MessageBox<T> messageBox;
+ readonly DataflowBlockOptions dataflowBlockOptions;
+ readonly Func<T, T> cloningFunction;
+ readonly BroadcastOutgoingQueue<T> outgoing;
+
+ public BroadcastBlock (Func<T, T> cloningFunction)
+ : this (cloningFunction, DataflowBlockOptions.Default)
{
-
}
- public BroadcastBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+ public BroadcastBlock (Func<T, T> cloningFunction,
+ DataflowBlockOptions dataflowBlockOptions)
{
if (dataflowBlockOptions == null)
throw new ArgumentNullException ("dataflowBlockOptions");
- this.cloner = cloner;
+ this.cloningFunction = cloningFunction;
this.dataflowBlockOptions = dataflowBlockOptions;
this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
- this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
- this.outgoing = new MessageOutgoingQueue<T> (compHelper, () => messageQueue.IsCompleted);
- this.vault = new MessageVault<T> ();
+ this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
+ () => outgoing.IsCompleted, _ => BroadcastProcess (), dataflowBlockOptions);
+ this.outgoing = new BroadcastOutgoingQueue<T> (this, compHelper,
+ () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+ dataflowBlockOptions, cloningFunction != null);
}
- public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
- T messageValue,
- ISourceBlock<T> source,
- bool consumeToAccept)
+ DataflowMessageStatus ITargetBlock<T>.OfferMessage (
+ DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+ bool consumeToAccept)
{
- return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+ return messageBox.OfferMessage (messageHeader, messageValue, source,
+ consumeToAccept);
}
- public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
+ public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
- return targets.AddTarget (target, unlinkAfterOne);
+ if (linkOptions == null)
+ throw new ArgumentNullException("linkOptions");
+
+ return outgoing.AddTarget (target, linkOptions);
}
- public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+ T ISourceBlock<T>.ConsumeMessage (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target,
+ out bool messageConsumed)
{
- return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+ T message = outgoing.ConsumeMessage (
+ messageHeader, target, out messageConsumed);
+ if (messageConsumed && cloningFunction != null)
+ message = cloningFunction (message);
+ return message;
}
- public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ bool ISourceBlock<T>.ReserveMessage (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target)
{
- vault.ReleaseReservation (messageHeader, target);
+ return outgoing.ReserveMessage (messageHeader, target);
}
- public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+ void ISourceBlock<T>.ReleaseReservation (DataflowMessageHeader messageHeader,
+ ITargetBlock<T> target)
{
- return vault.ReserveMessage (messageHeader, target);
+ outgoing.ReleaseReservation (messageHeader, target);
}
public bool TryReceive (Predicate<T> filter, out T item)
{
- return outgoing.TryReceive (filter, out item);
+ var received = outgoing.TryReceive (filter, out item);
+ if (received && cloningFunction != null)
+ item = cloningFunction (item);
+ return received;
}
- public bool TryReceiveAll (out IList<T> items)
+ bool IReceivableSourceBlock<T>.TryReceiveAll (out IList<T> items)
{
- return outgoing.TryReceiveAll (out items);
+ T item;
+ if (!TryReceive (null, out item)) {
+ items = null;
+ return false;
+ }
+
+ items = new[] { item };
+ return true;
}
+ /// <summary>
+ /// Moves items from the input queue to the output queue.
+ /// </summary>
void BroadcastProcess ()
{
- T input;
-
- if (!messageQueue.TryTake (out input) || targets.Current == null)
- return;
-
- foreach (var target in targets) {
- DataflowMessageHeader header = headers.Increment ();
- if (cloner != null)
- vault.StoreMessage (header, input);
- target.OfferMessage (header, input, this, cloner != null);
- // TODO: verify if it's the correct semantic
- T save = input;
- if (!messageQueue.TryTake (out input))
- input = save;
- }
+ T item;
+ while (messageQueue.TryTake (out item))
+ outgoing.AddData (item);
}
public void Complete ()
{
messageBox.Complete ();
+ outgoing.Complete ();
}
- public void Fault (Exception ex)
+ void IDataflowBlock.Fault (Exception exception)
{
- compHelper.RequestFault (ex);
+ compHelper.RequestFault (exception);
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
public override string ToString ()