// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow
{
internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
{
- readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
- readonly BlockingCollection<TInput> messageQueue;
- readonly Action processQueue;
+ readonly ExecutionDataflowBlockOptions options;
+ readonly Func<bool> processItem;
+ readonly Action outgoingQueueComplete;
readonly CompletionHelper compHelper;
- AtomicBoolean started = new AtomicBoolean ();
-
- public ExecutingMessageBox (BlockingCollection<TInput> messageQueue,
- CompletionHelper compHelper,
- Func<bool> externalCompleteTester,
- Action processQueue,
- ExecutionDataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+ // even number: Task is waiting to run
+ // odd number: Task is not waiting to run
+ // invariant: dop / 2 Tasks are running or waiting
+ int degreeOfParallelism = 1;
+
+ public ExecutingMessageBox (
+ ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+ CompletionHelper compHelper, Func<bool> externalCompleteTester,
+ Func<bool> processItem, Action outgoingQueueComplete,
+ ExecutionDataflowBlockOptions options)
+ : base (target, messageQueue, compHelper, externalCompleteTester, options)
{
- this.messageQueue = messageQueue;
- this.dataflowBlockOptions = dataflowBlockOptions;
- this.processQueue = processQueue;
+ this.options = options;
+ this.processItem = processItem;
+ this.outgoingQueueComplete = outgoingQueueComplete;
this.compHelper = compHelper;
}
- protected override void EnsureProcessing ()
+ protected override void EnsureProcessing (bool newItem)
{
- if (!started.TryRelaxedSet ())
- return;
+ StartProcessing ();
+ }
- Task[] tasks = new Task[dataflowBlockOptions.MaxDegreeOfParallelism];
- for (int i = 0; i < tasks.Length; ++i)
- tasks[i] = Task.Factory.StartNew (processQueue);
- Task.Factory.ContinueWhenAll (tasks, (_) => {
- started.Value = false;
- // Re-run ourselves in case of a race when data is available in the end
- if (messageQueue.Count > 0)
- EnsureProcessing ();
- else if (messageQueue.IsCompleted)
- compHelper.Complete ();
- });
+ void StartProcessing ()
+ {
+ // atomically increase degreeOfParallelism by 1 only if it's odd
+ // and low enough
+ int startDegreeOfParallelism;
+ int currentDegreeOfParallelism = degreeOfParallelism;
+ do {
+ startDegreeOfParallelism = currentDegreeOfParallelism;
+ if (startDegreeOfParallelism % 2 == 0
+ || (options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
+ && startDegreeOfParallelism / 2 >= options.MaxDegreeOfParallelism))
+ return;
+ currentDegreeOfParallelism =
+ Interlocked.CompareExchange (ref degreeOfParallelism,
+ startDegreeOfParallelism + 1, startDegreeOfParallelism);
+ } while (startDegreeOfParallelism != currentDegreeOfParallelism);
+
+ Task.Factory.StartNew (ProcessQueue, options.CancellationToken,
+ TaskCreationOptions.PreferFairness, options.TaskScheduler);
}
- }
-}
+ void ProcessQueue ()
+ {
+ compHelper.CanFaultOrCancelImmediatelly = false;
+
+ int incrementedDegreeOfParallelism =
+ Interlocked.Increment (ref degreeOfParallelism);
+ if ((options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
+ || incrementedDegreeOfParallelism / 2 < options.MaxDegreeOfParallelism)
+ && MessageQueue.Count > 0 && compHelper.CanRun)
+ StartProcessing ();
+
+ try {
+ int i = 0;
+ while (compHelper.CanRun
+ && (options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
+ || i++ < options.MaxMessagesPerTask)) {
+ if (!processItem ())
+ break;
+ }
+ } catch (Exception e) {
+ compHelper.RequestFault (e);
+ }
+
+ int decrementedDegreeOfParallelism =
+ Interlocked.Add (ref degreeOfParallelism, -2);
+
+ if (decrementedDegreeOfParallelism % 2 == 1) {
+ if (decrementedDegreeOfParallelism == 1) {
+ compHelper.CanFaultOrCancelImmediatelly = true;
+ base.VerifyCompleteness ();
+ if (MessageQueue.IsCompleted)
+ outgoingQueueComplete ();
+ }
+ if (MessageQueue.Count > 0)
+ EnsureProcessing (false);
+ }
+ }
+
+ protected override void OutgoingQueueComplete ()
+ {
+ if (MessageQueue.IsCompleted
+ && Thread.VolatileRead (ref degreeOfParallelism) == 1)
+ outgoingQueueComplete ();
+ }
+
+ protected override void VerifyCompleteness ()
+ {
+ if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
+ base.VerifyCompleteness ();
+ }
+ }
+}
\ No newline at end of file