Started adding documentation for non-public types and methods
authorPetr Onderka <gsvick@gmail.com>
Sat, 18 Aug 2012 21:24:31 +0000 (23:24 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 22:08:17 +0000 (00:08 +0200)
Also some minor refactoring

12 files changed:
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/AsyncExecutingMessageBox.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBoxBase.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

index 4547db0e8066e69c6d64d919122795694020b38e..1b8455c659bd82a80932b64b801325b31282e652 100644 (file)
@@ -91,6 +91,10 @@ namespace System.Threading.Tasks.Dataflow {
                               == DataflowMessageStatus.Accepted;
                }
 
+               /// <summary>
+               /// Processes one item from the queue if the action is synchronous.
+               /// </summary>
+               /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns>
                bool ProcessItem ()
                {
                        TInput data;
@@ -100,6 +104,11 @@ namespace System.Threading.Tasks.Dataflow {
                        return dequeued;
                }
 
+               /// <summary>
+               /// Processes one item from the queue if the action is asynchronous.
+               /// </summary>
+               /// <param name="task">The Task that was returned by the synchronous part of the action.</param>
+               /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
                bool AsyncProcessItem(out Task task)
                {
                        TInput data;
index dd97f084c8b8235691c0d1918d62f1cae42623d8..3cf41c3f29322b16701b49e6fc40edfd2e402251 100644 (file)
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// Message box for executing blocks with asynchrnous
+       /// (<see cref="Task"/>-returning) actions.
+       /// </summary>
+       /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
+       /// <typeparam name="TTask">Type of the Task the action is returning.</typeparam>
        class AsyncExecutingMessageBox<TInput, TTask>
                : ExecutingMessageBoxBase<TInput>
                where TTask : Task {
+               /// <summary>
+               /// Represents executing synchrnous part of the action.
+               /// </summary>
+               /// <param name="task">The Task that was returned by the synchronous part of the action.</param>
+               /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
                public delegate bool AsyncProcessItem (out TTask task);
 
                readonly AsyncProcessItem processItem;
@@ -45,6 +56,9 @@ namespace System.Threading.Tasks.Dataflow {
                        this.processFinishedTask = processFinishedTask;
                }
 
+               /// <summary>
+               /// Processes the input queue of the block.
+               /// </summary>
                protected override void ProcessQueue ()
                {
                        StartProcessQueue ();
@@ -52,6 +66,11 @@ namespace System.Threading.Tasks.Dataflow {
                        ProcessQueueWithoutStart ();
                }
 
+               /// <summary>
+               /// The part of <see cref="ProcessQueue"/> specific to asynchronous execution.
+               /// Handles scheduling continuation on the Task returned by the block's action
+               /// (or continuing synchrnously if possible).
+               /// </summary>
                void ProcessQueueWithoutStart ()
                {
                        // catch is needed here, if the Task-returning delegate throws exception itself
@@ -82,6 +101,9 @@ namespace System.Threading.Tasks.Dataflow {
                        FinishProcessQueue ();
                }
 
+               /// <summary>
+               /// Handles asynchronously finished Task, continues processing the queue.
+               /// </summary>
                void TaskFinished (TTask task)
                {
                        if (task.IsFaulted) {
index 3b1c21368452901ee7b9679e6580a19f71d03078..20a3a23cf59fcaf59c08c27008c9280909259de8 100644 (file)
@@ -118,6 +118,10 @@ namespace System.Threading.Tasks.Dataflow {
                        return outgoing.TryReceiveAll (out items);
                }
 
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
                void VerifyMaxNumberOfGroups ()
                {
                        if (dataflowBlockOptions.MaxNumberOfGroups == -1)
@@ -139,6 +143,11 @@ namespace System.Threading.Tasks.Dataflow {
                                Complete ();
                }
 
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// is not unbounded.
+               /// </summary>
                bool TryAdd ()
                {
                        bool lockTaken = false;
@@ -185,6 +194,12 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
+               /// <param name="addedItems">
+               /// Number of newly added items. Used only with greedy processing.
+               /// </param>
                void BatchProcess (int addedItems = 0)
                {
                        if (dataflowBlockOptions.Greedy) {
@@ -214,6 +229,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Returns whether non-greedy creation of a batch should be started.
+               /// </summary>
                bool ShouldProcessNonGreedy ()
                {
                        // do we have enough items waiting and would the new batch fit?
@@ -222,6 +240,9 @@ namespace System.Threading.Tasks.Dataflow {
                                   || outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
                }
 
+               /// <summary>
+               /// Creates a batch of the given size and adds the resulting batch to the output queue.
+               /// </summary>
                void MakeBatch (int size)
                {
                        T[] batch = new T[size];
@@ -243,6 +264,10 @@ namespace System.Threading.Tasks.Dataflow {
                        VerifyMaxNumberOfGroups ();
                }
 
+               /// <summary>
+               /// Starts non-greedy creation of batches, if one doesn't already run.
+               /// </summary>
+               /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
                void EnsureNonGreedyProcessing (bool manuallyTriggered)
                {
                        if (nonGreedyProcessing.TrySet ())
@@ -252,6 +277,11 @@ namespace System.Threading.Tasks.Dataflow {
                                        dataflowBlockOptions.TaskScheduler);
                }
 
+               /// <summary>
+               /// Creates batches in non-greedy mode,
+               /// making sure the whole batch is available by using reservations.
+               /// </summary>
+               /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
                void NonGreedyProcess (bool manuallyTriggered)
                {
                        bool first = true;
index 5492b681c565cae0b4196d236fa1aaeaf37f372c..cfafa02c14fa438eda23a4f843daa6f46c7630c8 100644 (file)
@@ -90,6 +90,9 @@ namespace System.Threading.Tasks.Dataflow {
                        get { return target2; }
                }
 
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// </summary>
                bool TryAdd ()
                {
                        bool lockTaken = false;
@@ -108,6 +111,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
                void SignalTarget ()
                {
                        bool lockTaken = false;
@@ -127,6 +133,9 @@ namespace System.Threading.Tasks.Dataflow {
                        MakeBatch (BatchSize);
                }
 
+               /// <summary>
+               /// Creates a batch of the given size and adds the resulting batch to the output queue.
+               /// </summary>
                void MakeBatch (int batchSize)
                {
                        if (batchSize == 0)
@@ -168,6 +177,10 @@ namespace System.Threading.Tasks.Dataflow {
                        VerifyMaxNumberOfGroups ();
                }
 
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
                void VerifyMaxNumberOfGroups ()
                {
                        if (options.MaxNumberOfGroups == -1)
index bddaf76fb03f87305518e91ae63911d9b854f291..5e8c402688decf5e33a696f6cdda299ae6e1cbd7 100644 (file)
@@ -100,6 +100,9 @@ namespace System.Threading.Tasks.Dataflow {
                        get { return target3; }
                }
 
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// </summary>
                bool TryAdd ()
                {
                        bool lockTaken = false;
@@ -118,6 +121,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
                void SignalTarget ()
                {
                        bool lockTaken = false;
@@ -137,6 +143,9 @@ namespace System.Threading.Tasks.Dataflow {
                        MakeBatch (BatchSize);
                }
 
+               /// <summary>
+               /// Creates a batch of the given size and adds the resulting batch to the output queue.
+               /// </summary>
                void MakeBatch (int batchSize)
                {
                        if (batchSize == 0)
@@ -186,6 +195,10 @@ namespace System.Threading.Tasks.Dataflow {
                        VerifyMaxNumberOfGroups ();
                }
 
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
                void VerifyMaxNumberOfGroups ()
                {
                        if (options.MaxNumberOfGroups == -1)
index c145b94af656be5e26f37a446adbaa6f80e82b76..b51d2b37c57dc6db627b4bd7dfce7247cf4370d3 100644 (file)
@@ -113,6 +113,9 @@ namespace System.Threading.Tasks.Dataflow {
                        return true;
                }
 
+               /// <summary>
+               /// Moves items from the input queue to the output queue.
+               /// </summary>
                void BroadcastProcess ()
                {
                        T item;
index 0a71aecdff053c3665011c1c1ea9b3b99bc406b0..e877c36134acd3b85e66c000a5495bf2899afc08 100644 (file)
@@ -24,8 +24,7 @@ using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow {
        /// <summary>
-       /// Version of <see cref="OutgoingQueueBase{T}"/>
-       /// for broadcast blocks.
+       /// Version of <see cref="OutgoingQueueBase{T}"/> for broadcast blocks.
        /// </summary>
        class BroadcastOutgoingQueue<T> : OutgoingQueueBase<T> {
                volatile bool hasCurrentItem;
@@ -52,6 +51,9 @@ namespace System.Threading.Tasks.Dataflow {
                        targets = new BroadcastTargetCollection<T> (block, hasCloner);
                }
 
+               /// <summary>
+               /// The current item that is to be sent to taget blocks.
+               /// </summary>
                T CurrentItem {
                        get {
                                T item;
@@ -79,6 +81,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Takes an item from the queue and sets it as <see cref="CurrentItem"/>.
+               /// </summary>
                public void DequeueItem()
                {
                        T item;
@@ -90,6 +95,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Manages sending items to the target blocks.
+               /// </summary>
                protected override void Process ()
                {
                        do {
index 9bbb8c7f7dc68f03ce02d131ac14a6a5a6e08313..b6b60bf7259d657bd7733795a692c055ca24bded 100644 (file)
@@ -90,6 +90,9 @@ namespace System.Threading.Tasks.Dataflow {
                        return outgoing.TryReceiveAll (out items);
                }
 
+               /// <summary>
+               /// Moves items from the input queue to the output queue.
+               /// </summary>
                void ProcessQueue ()
                {
                        T item;
index b87aa2c9d517d717f24dbcecf60ef85c78d58084..cc6a0056d6d6e31270a88ae600be29e504e8796f 100644 (file)
@@ -27,7 +27,7 @@ using System.Linq;
 
 namespace System.Threading.Tasks.Dataflow {
        /// <summary>
-       /// This is used to implement a default behavior for Dataflow completion tracking
+       /// Used to implement Dataflow completion tracking,
        /// that is the Completion property, Complete/Fault method combo
        /// and the CancellationToken option.
        /// </summary>
@@ -45,8 +45,8 @@ namespace System.Threading.Tasks.Dataflow {
 
                public CompletionHelper (DataflowBlockOptions options)
                {
-                       if (options != null)
-                               SetOptions (options);
+                       if (options != null && options.CancellationToken != CancellationToken.None)
+                               options.CancellationToken.Register (RequestCancel);
                }
 
                [Obsolete ("Use ctor")]
@@ -59,6 +59,12 @@ namespace System.Threading.Tasks.Dataflow {
                        get { return source.Task; }
                }
 
+               /// <summary>
+               /// Whether <see cref="Completion"/> can be faulted or cancelled immediatelly.
+               /// It can't for example when a block is currently executing user action.
+               /// In that case, the fault (or cancellation) is queued,
+               /// and is actually acted upon when this property is set back to <c>true</c>.
+               /// </summary>
                public bool CanFaultOrCancelImmediatelly {
                        get { return canFaultOrCancelImmediatelly.Value; }
                        set {
@@ -94,18 +100,38 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Whether the block can act as if it's not completed
+               /// (accept new items, start executing user action).
+               /// </summary>
                public bool CanRun {
-                       get {
-                               return source.Task.Status == TaskStatus.WaitingForActivation
-                                      && !requestedFaultOrCancel.Value;
-                       }
+                       get { return !Completion.IsCompleted && !requestedFaultOrCancel.Value; }
                }
 
+               /// <summary>
+               /// Sets the block as completed.
+               /// Should be called only when the block is really completed
+               /// (e.g. the output queue is empty) and not right after
+               /// the user calls <see cref="IDataflowBlock.Complete"/>.
+               /// </summary>
                public void Complete ()
                {
                        source.TrySetResult (null);
                }
 
+               /// <summary>
+               /// Requests faulting of the block using a given exception.
+               /// If the block can't be faulted immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
+               /// the exception will be queued, and the block will fault as soon as it can.
+               /// </summary>
+               /// <param name="exception">The exception that is the cause of the fault.</param>
+               /// <param name="canBeIgnored">Can this exception be ignored, if there are more exceptions?</param>
+               /// <remarks>
+               /// When calling <see cref="IDataflowBlock.Fault"/> repeatedly, only the first exception counts,
+               /// even in the cases where the block can't be faulted immediatelly.
+               /// But exceptions from user actions in execution blocks count always,
+               /// which is the reason for the <paramref name="canBeIgnored"/> parameter.
+               /// </remarks>
                public void RequestFault (Exception exception, bool canBeIgnored = true)
                {
                        if (exception == null)
@@ -121,16 +147,33 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Actually faults the block with a single exception.
+               /// </summary>
+               /// <remarks>
+               /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+               /// </remarks>
                void Fault (Exception exception)
                {
                        source.TrySetException (exception);
                }
 
+               /// <summary>
+               /// Actually faults the block with a multiple exceptions.
+               /// </summary>
+               /// <remarks>
+               /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+               /// </remarks>
                void Fault (IEnumerable<Exception> exceptions)
                {
                        source.TrySetException (exceptions);
                }
 
+               /// <summary>
+               /// Requests cancellation of the block.
+               /// If the block can't be cancelled immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
+               /// the cancellation will be queued, and the block will cancel as soon as it can.
+               /// </summary>
                void RequestCancel ()
                {
                        if (CanFaultOrCancelImmediatelly)
@@ -142,15 +185,15 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Actually cancels the block.
+               /// </summary>
+               /// <remarks>
+               /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
+               /// </remarks>
                void Cancel ()
                {
                        source.TrySetCanceled ();
                }
-
-               void SetOptions (DataflowBlockOptions options)
-               {
-                       if (options.CancellationToken != CancellationToken.None)
-                               options.CancellationToken.Register (RequestCancel);
-               }
        }
 }
\ No newline at end of file
index a911b6252d854bbdd0203c4ed955ceec87303866..bbeccb560f96436f7283ff6adfcc5582d21b886e 100644 (file)
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// Message box for executing blocks with synchrnous actions.
+       /// </summary>
+       /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
        class ExecutingMessageBox<TInput> : ExecutingMessageBoxBase<TInput> {
                readonly Func<bool> processItem;
 
@@ -39,6 +43,9 @@ namespace System.Threading.Tasks.Dataflow {
                        this.processItem = processItem;
                }
 
+               /// <summary>
+               /// Processes the input queue of the block.
+               /// </summary>
                protected override void ProcessQueue ()
                {
                        StartProcessQueue ();
index 0fd91f60b6b218a84e897228f0eed32f9d482350..b87e8601f5e8d2a24e366c72bc6e40976ec18191 100644 (file)
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// Base message box for execution blocks (synchronous and asynchrnous).
+       /// </summary>
+       /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
        abstract class ExecutingMessageBoxBase<TInput> : MessageBox<TInput> {
                protected ExecutionDataflowBlockOptions Options { get; private set; }
                readonly Action outgoingQueueComplete;
@@ -45,11 +49,20 @@ namespace System.Threading.Tasks.Dataflow {
                        this.outgoingQueueComplete = outgoingQueueComplete;
                }
 
+               /// <summary>
+               /// Makes sure the input queue is processed the way it needs to.
+               /// </summary>
+               /// <param name="newItem">Was new item just added?</param>
                protected override void EnsureProcessing (bool newItem)
                {
                        StartProcessing ();
                }
 
+               /// <summary>
+               /// Starts processing queue on a task,
+               /// assuming <see cref="ExecutionDataflowBlockOptions.MaxDegreeOfParallelism"/>
+               /// was't reached yet.
+               /// </summary>
                void StartProcessing ()
                {
                        // atomically increase degreeOfParallelism by 1 only if it's odd
@@ -71,8 +84,19 @@ namespace System.Threading.Tasks.Dataflow {
                                TaskCreationOptions.PreferFairness, Options.TaskScheduler);
                }
 
+               /// <summary>
+               /// Processes the input queue of the block.
+               /// </summary>
+               /// <remarks>
+               /// Should first call <see cref="StartProcessQueue"/>,
+               /// then process the queue and finally call <see cref="FinishProcessQueue"/>.
+               /// </remarks>
                protected abstract void ProcessQueue ();
 
+               /// <summary>
+               /// Notifies that another processing task was started.
+               /// Should be called right after <see cref="ProcessQueue"/> is actually executed.
+               /// </summary>
                protected void StartProcessQueue ()
                {
                        CompHelper.CanFaultOrCancelImmediatelly = false;
@@ -85,6 +109,10 @@ namespace System.Threading.Tasks.Dataflow {
                                StartProcessing ();
                }
 
+               /// <summary>
+               /// Notifies that a processing task was finished.
+               /// Should be called after <see cref="ProcessQueue"/> actually finishes processing.
+               /// </summary>
                protected void FinishProcessQueue ()
                {
                        int decrementedDegreeOfParallelism =
@@ -102,6 +130,9 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               /// <summary>
+               /// Notifies that outgoing queue should be completed, if possible.
+               /// </summary>
                protected override void OutgoingQueueComplete ()
                {
                        if (MessageQueue.IsCompleted
@@ -109,12 +140,19 @@ namespace System.Threading.Tasks.Dataflow {
                                outgoingQueueComplete ();
                }
 
+               /// <summary>
+               /// Makes sure the block is completed if it should be.
+               /// </summary>
                protected override void VerifyCompleteness ()
                {
                        if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
                                base.VerifyCompleteness ();
                }
 
+               /// <summary>
+               /// Indicates whether a processing task can continue executing.
+               /// </summary>
+               /// <param name="iteration">The number of the iteration of the task, starting from 0.</param>
                protected bool CanRun (int iteration)
                {
                        return CompHelper.CanRun
index 551fadbb7e1d845f12905a1d031b3713239d6516..14ab8541fe806af2bbc9913828c5f59b32b0c70d 100644 (file)
@@ -171,6 +171,7 @@ namespace System.Threading.Tasks.Dataflow {
                public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
                {
                        while (!postponedMessages.IsEmpty) {
+                               // KeyValuePair is a struct, so default value is not null
                                var block = postponedMessages.FirstOrDefault () .Key;
 
                                // collection is empty
@@ -254,9 +255,11 @@ namespace System.Threading.Tasks.Dataflow {
                                EnsurePostponedProcessing ();
                }
 
-               protected virtual void EnsureProcessing (bool newItem)
-               {
-               }
+               /// <summary>
+               /// Makes sure the input queue is processed the way it needs to.
+               /// </summary>
+               /// <param name="newItem">Was new item just added?</param>
+               protected abstract void EnsureProcessing (bool newItem);
 
                public void Complete ()
                {
@@ -269,11 +272,17 @@ namespace System.Threading.Tasks.Dataflow {
                                EnsurePostponedProcessing ();
                }
 
+               /// <summary>
+               /// Notifies that outgoing queue should be completed, if possible.
+               /// </summary>
                protected virtual void OutgoingQueueComplete ()
                {
                }
 
-               protected  virtual void VerifyCompleteness ()
+               /// <summary>
+               /// Makes sure the block is completed if it should be.
+               /// </summary>
+               protected virtual void VerifyCompleteness ()
                {
                        if (MessageQueue.IsCompleted && externalCompleteTester ())
                                CompHelper.Complete ();