== DataflowMessageStatus.Accepted;
}
+ /// <summary>
+ /// Processes one item from the queue if the action is synchronous.
+ /// </summary>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns>
bool ProcessItem ()
{
TInput data;
return dequeued;
}
+ /// <summary>
+ /// Processes one item from the queue if the action is asynchronous.
+ /// </summary>
+ /// <param name="task">The Task that was returned by the synchronous part of the action.</param>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
bool AsyncProcessItem(out Task task)
{
TInput data;
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Message box for executing blocks with asynchrnous
+ /// (<see cref="Task"/>-returning) actions.
+ /// </summary>
+ /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
+ /// <typeparam name="TTask">Type of the Task the action is returning.</typeparam>
class AsyncExecutingMessageBox<TInput, TTask>
: ExecutingMessageBoxBase<TInput>
where TTask : Task {
+ /// <summary>
+ /// Represents executing synchrnous part of the action.
+ /// </summary>
+ /// <param name="task">The Task that was returned by the synchronous part of the action.</param>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
public delegate bool AsyncProcessItem (out TTask task);
readonly AsyncProcessItem processItem;
this.processFinishedTask = processFinishedTask;
}
+ /// <summary>
+ /// Processes the input queue of the block.
+ /// </summary>
protected override void ProcessQueue ()
{
StartProcessQueue ();
ProcessQueueWithoutStart ();
}
+ /// <summary>
+ /// The part of <see cref="ProcessQueue"/> specific to asynchronous execution.
+ /// Handles scheduling continuation on the Task returned by the block's action
+ /// (or continuing synchrnously if possible).
+ /// </summary>
void ProcessQueueWithoutStart ()
{
// catch is needed here, if the Task-returning delegate throws exception itself
FinishProcessQueue ();
}
+ /// <summary>
+ /// Handles asynchronously finished Task, continues processing the queue.
+ /// </summary>
void TaskFinished (TTask task)
{
if (task.IsFaulted) {
return outgoing.TryReceiveAll (out items);
}
+ /// <summary>
+ /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// has been reached. If it did, <see cref="Complete"/>s the block.
+ /// </summary>
void VerifyMaxNumberOfGroups ()
{
if (dataflowBlockOptions.MaxNumberOfGroups == -1)
Complete ();
}
+ /// <summary>
+ /// Returns whether a new item can be accepted, and increments a counter if it can.
+ /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// is not unbounded.
+ /// </summary>
bool TryAdd ()
{
bool lockTaken = false;
}
}
+ /// <summary>
+ /// Decides whether to create a new batch or not.
+ /// </summary>
+ /// <param name="addedItems">
+ /// Number of newly added items. Used only with greedy processing.
+ /// </param>
void BatchProcess (int addedItems = 0)
{
if (dataflowBlockOptions.Greedy) {
}
}
+ /// <summary>
+ /// Returns whether non-greedy creation of a batch should be started.
+ /// </summary>
bool ShouldProcessNonGreedy ()
{
// do we have enough items waiting and would the new batch fit?
|| outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
}
+ /// <summary>
+ /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// </summary>
void MakeBatch (int size)
{
T[] batch = new T[size];
VerifyMaxNumberOfGroups ();
}
+ /// <summary>
+ /// Starts non-greedy creation of batches, if one doesn't already run.
+ /// </summary>
+ /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
void EnsureNonGreedyProcessing (bool manuallyTriggered)
{
if (nonGreedyProcessing.TrySet ())
dataflowBlockOptions.TaskScheduler);
}
+ /// <summary>
+ /// Creates batches in non-greedy mode,
+ /// making sure the whole batch is available by using reservations.
+ /// </summary>
+ /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
void NonGreedyProcess (bool manuallyTriggered)
{
bool first = true;
get { return target2; }
}
+ /// <summary>
+ /// Returns whether a new item can be accepted, and increments a counter if it can.
+ /// </summary>
bool TryAdd ()
{
bool lockTaken = false;
}
}
+ /// <summary>
+ /// Decides whether to create a new batch or not.
+ /// </summary>
void SignalTarget ()
{
bool lockTaken = false;
MakeBatch (BatchSize);
}
+ /// <summary>
+ /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// </summary>
void MakeBatch (int batchSize)
{
if (batchSize == 0)
VerifyMaxNumberOfGroups ();
}
+ /// <summary>
+ /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// has been reached. If it did, <see cref="Complete"/>s the block.
+ /// </summary>
void VerifyMaxNumberOfGroups ()
{
if (options.MaxNumberOfGroups == -1)
get { return target3; }
}
+ /// <summary>
+ /// Returns whether a new item can be accepted, and increments a counter if it can.
+ /// </summary>
bool TryAdd ()
{
bool lockTaken = false;
}
}
+ /// <summary>
+ /// Decides whether to create a new batch or not.
+ /// </summary>
void SignalTarget ()
{
bool lockTaken = false;
MakeBatch (BatchSize);
}
+ /// <summary>
+ /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// </summary>
void MakeBatch (int batchSize)
{
if (batchSize == 0)
VerifyMaxNumberOfGroups ();
}
+ /// <summary>
+ /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// has been reached. If it did, <see cref="Complete"/>s the block.
+ /// </summary>
void VerifyMaxNumberOfGroups ()
{
if (options.MaxNumberOfGroups == -1)
return true;
}
+ /// <summary>
+ /// Moves items from the input queue to the output queue.
+ /// </summary>
void BroadcastProcess ()
{
T item;
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// Version of <see cref="OutgoingQueueBase{T}"/>
- /// for broadcast blocks.
+ /// Version of <see cref="OutgoingQueueBase{T}"/> for broadcast blocks.
/// </summary>
class BroadcastOutgoingQueue<T> : OutgoingQueueBase<T> {
volatile bool hasCurrentItem;
targets = new BroadcastTargetCollection<T> (block, hasCloner);
}
+ /// <summary>
+ /// The current item that is to be sent to taget blocks.
+ /// </summary>
T CurrentItem {
get {
T item;
}
}
+ /// <summary>
+ /// Takes an item from the queue and sets it as <see cref="CurrentItem"/>.
+ /// </summary>
public void DequeueItem()
{
T item;
}
}
+ /// <summary>
+ /// Manages sending items to the target blocks.
+ /// </summary>
protected override void Process ()
{
do {
return outgoing.TryReceiveAll (out items);
}
+ /// <summary>
+ /// Moves items from the input queue to the output queue.
+ /// </summary>
void ProcessQueue ()
{
T item;
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// This is used to implement a default behavior for Dataflow completion tracking
+ /// Used to implement Dataflow completion tracking,
/// that is the Completion property, Complete/Fault method combo
/// and the CancellationToken option.
/// </summary>
public CompletionHelper (DataflowBlockOptions options)
{
- if (options != null)
- SetOptions (options);
+ if (options != null && options.CancellationToken != CancellationToken.None)
+ options.CancellationToken.Register (RequestCancel);
}
[Obsolete ("Use ctor")]
get { return source.Task; }
}
+ /// <summary>
+ /// Whether <see cref="Completion"/> can be faulted or cancelled immediatelly.
+ /// It can't for example when a block is currently executing user action.
+ /// In that case, the fault (or cancellation) is queued,
+ /// and is actually acted upon when this property is set back to <c>true</c>.
+ /// </summary>
public bool CanFaultOrCancelImmediatelly {
get { return canFaultOrCancelImmediatelly.Value; }
set {
}
}
+ /// <summary>
+ /// Whether the block can act as if it's not completed
+ /// (accept new items, start executing user action).
+ /// </summary>
public bool CanRun {
- get {
- return source.Task.Status == TaskStatus.WaitingForActivation
- && !requestedFaultOrCancel.Value;
- }
+ get { return !Completion.IsCompleted && !requestedFaultOrCancel.Value; }
}
+ /// <summary>
+ /// Sets the block as completed.
+ /// Should be called only when the block is really completed
+ /// (e.g. the output queue is empty) and not right after
+ /// the user calls <see cref="IDataflowBlock.Complete"/>.
+ /// </summary>
public void Complete ()
{
source.TrySetResult (null);
}
+ /// <summary>
+ /// Requests faulting of the block using a given exception.
+ /// If the block can't be faulted immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
+ /// the exception will be queued, and the block will fault as soon as it can.
+ /// </summary>
+ /// <param name="exception">The exception that is the cause of the fault.</param>
+ /// <param name="canBeIgnored">Can this exception be ignored, if there are more exceptions?</param>
+ /// <remarks>
+ /// When calling <see cref="IDataflowBlock.Fault"/> repeatedly, only the first exception counts,
+ /// even in the cases where the block can't be faulted immediatelly.
+ /// But exceptions from user actions in execution blocks count always,
+ /// which is the reason for the <paramref name="canBeIgnored"/> parameter.
+ /// </remarks>
public void RequestFault (Exception exception, bool canBeIgnored = true)
{
if (exception == null)
}
}
+ /// <summary>
+ /// Actually faults the block with a single exception.
+ /// </summary>
+ /// <remarks>
+ /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+ /// </remarks>
void Fault (Exception exception)
{
source.TrySetException (exception);
}
+ /// <summary>
+ /// Actually faults the block with a multiple exceptions.
+ /// </summary>
+ /// <remarks>
+ /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+ /// </remarks>
void Fault (IEnumerable<Exception> exceptions)
{
source.TrySetException (exceptions);
}
+ /// <summary>
+ /// Requests cancellation of the block.
+ /// If the block can't be cancelled immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
+ /// the cancellation will be queued, and the block will cancel as soon as it can.
+ /// </summary>
void RequestCancel ()
{
if (CanFaultOrCancelImmediatelly)
}
}
+ /// <summary>
+ /// Actually cancels the block.
+ /// </summary>
+ /// <remarks>
+ /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+ /// </remarks>
void Cancel ()
{
source.TrySetCanceled ();
}
-
- void SetOptions (DataflowBlockOptions options)
- {
- if (options.CancellationToken != CancellationToken.None)
- options.CancellationToken.Register (RequestCancel);
- }
}
}
\ No newline at end of file
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Message box for executing blocks with synchrnous actions.
+ /// </summary>
+ /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
class ExecutingMessageBox<TInput> : ExecutingMessageBoxBase<TInput> {
readonly Func<bool> processItem;
this.processItem = processItem;
}
+ /// <summary>
+ /// Processes the input queue of the block.
+ /// </summary>
protected override void ProcessQueue ()
{
StartProcessQueue ();
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Base message box for execution blocks (synchronous and asynchrnous).
+ /// </summary>
+ /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
abstract class ExecutingMessageBoxBase<TInput> : MessageBox<TInput> {
protected ExecutionDataflowBlockOptions Options { get; private set; }
readonly Action outgoingQueueComplete;
this.outgoingQueueComplete = outgoingQueueComplete;
}
+ /// <summary>
+ /// Makes sure the input queue is processed the way it needs to.
+ /// </summary>
+ /// <param name="newItem">Was new item just added?</param>
protected override void EnsureProcessing (bool newItem)
{
StartProcessing ();
}
+ /// <summary>
+ /// Starts processing queue on a task,
+ /// assuming <see cref="ExecutionDataflowBlockOptions.MaxDegreeOfParallelism"/>
+ /// was't reached yet.
+ /// </summary>
void StartProcessing ()
{
// atomically increase degreeOfParallelism by 1 only if it's odd
TaskCreationOptions.PreferFairness, Options.TaskScheduler);
}
+ /// <summary>
+ /// Processes the input queue of the block.
+ /// </summary>
+ /// <remarks>
+ /// Should first call <see cref="StartProcessQueue"/>,
+ /// then process the queue and finally call <see cref="FinishProcessQueue"/>.
+ /// </remarks>
protected abstract void ProcessQueue ();
+ /// <summary>
+ /// Notifies that another processing task was started.
+ /// Should be called right after <see cref="ProcessQueue"/> is actually executed.
+ /// </summary>
protected void StartProcessQueue ()
{
CompHelper.CanFaultOrCancelImmediatelly = false;
StartProcessing ();
}
+ /// <summary>
+ /// Notifies that a processing task was finished.
+ /// Should be called after <see cref="ProcessQueue"/> actually finishes processing.
+ /// </summary>
protected void FinishProcessQueue ()
{
int decrementedDegreeOfParallelism =
}
}
+ /// <summary>
+ /// Notifies that outgoing queue should be completed, if possible.
+ /// </summary>
protected override void OutgoingQueueComplete ()
{
if (MessageQueue.IsCompleted
outgoingQueueComplete ();
}
+ /// <summary>
+ /// Makes sure the block is completed if it should be.
+ /// </summary>
protected override void VerifyCompleteness ()
{
if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
base.VerifyCompleteness ();
}
+ /// <summary>
+ /// Indicates whether a processing task can continue executing.
+ /// </summary>
+ /// <param name="iteration">The number of the iteration of the task, starting from 0.</param>
protected bool CanRun (int iteration)
{
return CompHelper.CanRun
public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
{
while (!postponedMessages.IsEmpty) {
+ // KeyValuePair is a struct, so default value is not null
var block = postponedMessages.FirstOrDefault () .Key;
// collection is empty
EnsurePostponedProcessing ();
}
- protected virtual void EnsureProcessing (bool newItem)
- {
- }
+ /// <summary>
+ /// Makes sure the input queue is processed the way it needs to.
+ /// </summary>
+ /// <param name="newItem">Was new item just added?</param>
+ protected abstract void EnsureProcessing (bool newItem);
public void Complete ()
{
EnsurePostponedProcessing ();
}
+ /// <summary>
+ /// Notifies that outgoing queue should be completed, if possible.
+ /// </summary>
protected virtual void OutgoingQueueComplete ()
{
}
- protected virtual void VerifyCompleteness ()
+ /// <summary>
+ /// Makes sure the block is completed if it should be.
+ /// </summary>
+ protected virtual void VerifyCompleteness ()
{
if (MessageQueue.IsCompleted && externalCompleteTester ())
CompHelper.Complete ();