Fixed non-greedy blocks with BoundedCapacity
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / ExecutingMessageBox.cs
index 953f4d88c21e079d289dc737e2b517426e72550f..e60f215408b616f66473fe717be52cfae57d6bb1 100644 (file)
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
-
 
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow
 {
        internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
        {
-               readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
-               readonly BlockingCollection<TInput> messageQueue;
-               readonly Action processQueue;
+               readonly ExecutionDataflowBlockOptions options;
+               readonly Func<bool> processItem;
+               readonly Action outgoingQueueComplete;
                readonly CompletionHelper compHelper;
 
-               AtomicBoolean started = new AtomicBoolean ();
-               
-               public ExecutingMessageBox (BlockingCollection<TInput> messageQueue,
-                                           CompletionHelper compHelper,
-                                           Func<bool> externalCompleteTester,
-                                           Action processQueue,
-                                           ExecutionDataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+               // even number: Task is waiting to run
+               // odd number: Task is not waiting to run
+               // invariant: dop / 2 Tasks are running or waiting
+               int degreeOfParallelism = 1;
+
+               public ExecutingMessageBox (
+                       ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+                       CompletionHelper compHelper, Func<bool> externalCompleteTester,
+                       Func<bool> processItem, Action outgoingQueueComplete,
+                       ExecutionDataflowBlockOptions options)
+                       : base (target, messageQueue, compHelper, externalCompleteTester, options)
                {
-                       this.messageQueue = messageQueue;
-                       this.dataflowBlockOptions = dataflowBlockOptions;
-                       this.processQueue = processQueue;
+                       this.options = options;
+                       this.processItem = processItem;
+                       this.outgoingQueueComplete = outgoingQueueComplete;
                        this.compHelper = compHelper;
                }
 
-               protected override void EnsureProcessing ()
+               protected override void EnsureProcessing (bool newItem)
                {
-                       if (!started.TryRelaxedSet ())
-                               return;
+                       StartProcessing ();
+               }
 
-                       Task[] tasks = new Task[dataflowBlockOptions.MaxDegreeOfParallelism];
-                       for (int i = 0; i < tasks.Length; ++i)
-                               tasks[i] = Task.Factory.StartNew (processQueue);
-                       Task.Factory.ContinueWhenAll (tasks, (_) => {
-                               started.Value = false;
-                               // Re-run ourselves in case of a race when data is available in the end
-                               if (messageQueue.Count > 0)
-                                       EnsureProcessing ();
-                               else if (messageQueue.IsCompleted)
-                                       compHelper.Complete ();
-                       });
+               void StartProcessing ()
+               {
+                       // atomically increase degreeOfParallelism by 1 only if it's odd
+                       // and low enough
+                       int startDegreeOfParallelism;
+                       int currentDegreeOfParallelism = degreeOfParallelism;
+                       do {
+                               startDegreeOfParallelism = currentDegreeOfParallelism;
+                               if (startDegreeOfParallelism % 2 == 0
+                                   || (options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
+                                       && startDegreeOfParallelism / 2 >= options.MaxDegreeOfParallelism))
+                                       return;
+                               currentDegreeOfParallelism =
+                                       Interlocked.CompareExchange (ref degreeOfParallelism,
+                                               startDegreeOfParallelism + 1, startDegreeOfParallelism);
+                       } while (startDegreeOfParallelism != currentDegreeOfParallelism);
+
+                       Task.Factory.StartNew (ProcessQueue, options.CancellationToken,
+                               TaskCreationOptions.PreferFairness, options.TaskScheduler);
                }
-       }
-}
 
+               void ProcessQueue ()
+               {
+                       compHelper.CanFaultOrCancelImmediatelly = false;
+
+                       int incrementedDegreeOfParallelism =
+                               Interlocked.Increment (ref degreeOfParallelism);
+                       if ((options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
+                            || incrementedDegreeOfParallelism / 2 < options.MaxDegreeOfParallelism)
+                           && MessageQueue.Count > 0 && compHelper.CanRun)
+                               StartProcessing ();
+
+                       try {
+                               int i = 0;
+                               while (compHelper.CanRun
+                                      && (options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
+                                          || i++ < options.MaxMessagesPerTask)) {
+                                       if (!processItem ())
+                                               break;
+                               }
+                       } catch (Exception e) {
+                               compHelper.RequestFault (e);
+                       }
+
+                       int decrementedDegreeOfParallelism =
+                               Interlocked.Add (ref degreeOfParallelism, -2);
+
+                       if (decrementedDegreeOfParallelism % 2 == 1) {
+                               if (decrementedDegreeOfParallelism == 1) {
+                                       compHelper.CanFaultOrCancelImmediatelly = true;
+                                       base.VerifyCompleteness ();
+                                       if (MessageQueue.IsCompleted)
+                                               outgoingQueueComplete ();
+                               }
+                               if (MessageQueue.Count > 0)
+                                       EnsureProcessing (false);
+                       }
+               }
+
+               protected override void OutgoingQueueComplete ()
+               {
+                       if (MessageQueue.IsCompleted
+                           && Thread.VolatileRead (ref degreeOfParallelism) == 1)
+                               outgoingQueueComplete ();
+               }
+
+               protected override void VerifyCompleteness ()
+               {
+                       if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
+                               base.VerifyCompleteness ();
+               }
+       }
+}
\ No newline at end of file