1 // ExecutingMessageBoxBase.cs
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Concurrent;
26 namespace System.Threading.Tasks.Dataflow {
28 /// Base message box for execution blocks (synchronous and asynchrnous).
30 /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
31 abstract class ExecutingMessageBoxBase<TInput> : MessageBox<TInput> {
32 protected ExecutionDataflowBlockOptions Options { get; private set; }
33 readonly Action outgoingQueueComplete;
35 // even number: Task is waiting to run
36 // odd number: Task is not waiting to run
37 // invariant: dop / 2 Tasks are running or waiting
38 int degreeOfParallelism = 1;
40 protected ExecutingMessageBoxBase (
41 ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
42 CompletionHelper compHelper, Func<bool> externalCompleteTester,
43 Action outgoingQueueComplete, ExecutionDataflowBlockOptions options)
45 target, messageQueue, compHelper, externalCompleteTester,
48 this.Options = options;
49 this.outgoingQueueComplete = outgoingQueueComplete;
53 /// Makes sure the input queue is processed the way it needs to.
55 /// <param name="newItem">Was new item just added?</param>
56 protected override void EnsureProcessing (bool newItem)
62 /// Starts processing queue on a task,
63 /// assuming <see cref="ExecutionDataflowBlockOptions.MaxDegreeOfParallelism"/>
64 /// was't reached yet.
66 void StartProcessing ()
68 // atomically increase degreeOfParallelism by 1 only if it's odd
70 int startDegreeOfParallelism;
71 int currentDegreeOfParallelism = degreeOfParallelism;
73 startDegreeOfParallelism = currentDegreeOfParallelism;
74 if (startDegreeOfParallelism % 2 == 0
75 || (Options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
76 && startDegreeOfParallelism / 2 >= Options.MaxDegreeOfParallelism))
78 currentDegreeOfParallelism =
79 Interlocked.CompareExchange (ref degreeOfParallelism,
80 startDegreeOfParallelism + 1, startDegreeOfParallelism);
81 } while (startDegreeOfParallelism != currentDegreeOfParallelism);
83 Task.Factory.StartNew (ProcessQueue, CancellationToken.None,
84 TaskCreationOptions.PreferFairness, Options.TaskScheduler);
88 /// Processes the input queue of the block.
91 /// Should first call <see cref="StartProcessQueue"/>,
92 /// then process the queue and finally call <see cref="FinishProcessQueue"/>.
94 protected abstract void ProcessQueue ();
97 /// Notifies that another processing task was started.
98 /// Should be called right after <see cref="ProcessQueue"/> is actually executed.
100 protected void StartProcessQueue ()
102 CompHelper.CanFaultOrCancelImmediatelly = false;
104 int incrementedDegreeOfParallelism =
105 Interlocked.Increment (ref degreeOfParallelism);
106 if ((Options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
107 || incrementedDegreeOfParallelism / 2 < Options.MaxDegreeOfParallelism)
108 && MessageQueue.Count > 1 && CompHelper.CanRun)
113 /// Notifies that a processing task was finished.
114 /// Should be called after <see cref="ProcessQueue"/> actually finishes processing.
116 protected void FinishProcessQueue ()
118 int decrementedDegreeOfParallelism =
119 Interlocked.Add (ref degreeOfParallelism, -2);
121 if (decrementedDegreeOfParallelism % 2 == 1) {
122 if (decrementedDegreeOfParallelism == 1) {
123 CompHelper.CanFaultOrCancelImmediatelly = true;
124 base.VerifyCompleteness ();
125 if (MessageQueue.IsCompleted)
126 outgoingQueueComplete ();
128 if (MessageQueue.Count > 0 && CompHelper.CanRun)
134 /// Notifies that outgoing queue should be completed, if possible.
136 protected override void OutgoingQueueComplete ()
138 if (MessageQueue.IsCompleted
139 && Volatile.Read (ref degreeOfParallelism) == 1)
140 outgoingQueueComplete ();
144 /// Makes sure the block is completed if it should be.
146 protected override void VerifyCompleteness ()
148 if (Volatile.Read (ref degreeOfParallelism) == 1)
149 base.VerifyCompleteness ();
153 /// Indicates whether a processing task can continue executing.
155 /// <param name="iteration">The number of the iteration of the task, starting from 0.</param>
156 protected bool CanRun (int iteration)
158 return CompHelper.CanRun
159 && (Options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
160 || iteration < Options.MaxMessagesPerTask);