Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchBlock.cs
index 963024709d1944bd767c469b71fda07db98560ed..312e5f2803895f485f61b59e859f3545d6e7770a 100644 (file)
@@ -1,6 +1,7 @@
 // BatchBlock.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
-
-using System;
-using System.Threading.Tasks;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace System.Threading.Tasks.Dataflow
-{
-       public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T[]>, IReceivableSourceBlock<T[]>
-       {
-               static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
-
-               CompletionHelper compHelper = CompletionHelper.GetNew ();
-               BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
-               MessageBox<T> messageBox;
-               MessageVault<T[]> vault;
-               DataflowBlockOptions dataflowBlockOptions;
+namespace System.Threading.Tasks.Dataflow {
+       public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]> {
+               readonly CompletionHelper compHelper;
+               readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+               readonly MessageBox<T> messageBox;
+               readonly GroupingDataflowBlockOptions dataflowBlockOptions;
                readonly int batchSize;
                int batchCount;
-               MessageOutgoingQueue<T[]> outgoing;
-               TargetBuffer<T[]> targets = new TargetBuffer<T[]> ();
-               DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+               long numberOfGroups;
+               SpinLock batchCountLock;
+               readonly OutgoingQueue<T[]> outgoing;
+               SpinLock batchLock;
+               readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
 
-               public BatchBlock (int batchSize) : this (batchSize, defaultOptions)
+               public BatchBlock (int batchSize) : this (batchSize, GroupingDataflowBlockOptions.Default)
                {
-
                }
 
-               public BatchBlock (int batchSize, DataflowBlockOptions dataflowBlockOptions)
+               public BatchBlock (int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
                {
+                       if (batchSize <= 0)
+                               throw new ArgumentOutOfRangeException ("batchSize", batchSize,
+                                       "The batchSize must be positive.");
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
+                       if (dataflowBlockOptions.BoundedCapacity != -1
+                           && batchSize > dataflowBlockOptions.BoundedCapacity)
+                               throw new ArgumentOutOfRangeException ("batchSize",
+                                       "The batchSize must be smaller than the value of BoundedCapacity.");
 
                        this.batchSize = batchSize;
                        this.dataflowBlockOptions = dataflowBlockOptions;
-                       this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BatchProcess, dataflowBlockOptions);
-                       this.outgoing = new MessageOutgoingQueue<T[]> (compHelper, () => messageQueue.IsCompleted);
-                       this.vault = new MessageVault<T[]> ();
+                       this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
+
+                       Action<bool> processQueue;
+                       Func<bool> canAccept;
+                       if (dataflowBlockOptions.MaxNumberOfGroups == -1) {
+                               processQueue = newItem => BatchProcess (newItem ? 1 : 0);
+                               canAccept = null;
+                       } else {
+                               processQueue = _ => BatchProcess ();
+                               canAccept = TryAdd;
+                       }
+
+                       this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
+                               () => outgoing.IsCompleted, processQueue, dataflowBlockOptions,
+                               dataflowBlockOptions.Greedy, canAccept);
+                       this.outgoing = new OutgoingQueue<T[]> (this, compHelper,
+                               () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+                               dataflowBlockOptions, batch => batch.Length);
                }
 
-               public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
-                                                          T messageValue,
-                                                          ISourceBlock<T> source,
-                                                          bool consumeToAccept)
+               DataflowMessageStatus ITargetBlock<T>.OfferMessage (
+                       DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+                       bool consumeToAccept)
                {
-                       return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+                       return messageBox.OfferMessage (
+                               messageHeader, messageValue, source, consumeToAccept);
                }
 
-               public IDisposable LinkTo (ITargetBlock<T[]> target, bool unlinkAfterOne)
+               public IDisposable LinkTo (ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
                {
-                       var result = targets.AddTarget (target, unlinkAfterOne);
-                       outgoing.ProcessForTarget (target, this, false, ref headers);
-
-                       return result;
+                       return outgoing.AddTarget (target, linkOptions);
                }
 
-               public T[] ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed)
+               T[] ISourceBlock<T[]>.ConsumeMessage (
+                       DataflowMessageHeader messageHeader, ITargetBlock<T[]> target,
+                       out bool messageConsumed)
                {
-                       return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+                       return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
                }
 
-               public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
+               void ISourceBlock<T[]>.ReleaseReservation (
+                       DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
                {
-                       vault.ReleaseReservation (messageHeader, target);
+                       outgoing.ReleaseReservation (messageHeader, target);
                }
 
-               public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
+               bool ISourceBlock<T[]>.ReserveMessage (
+                       DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
                {
-                       return vault.ReserveMessage (messageHeader, target);
+                       return outgoing.ReserveMessage (messageHeader, target);
                }
 
                public bool TryReceive (Predicate<T[]> filter, out T[] item)
@@ -103,69 +118,259 @@ namespace System.Threading.Tasks.Dataflow
                        return outgoing.TryReceiveAll (out items);
                }
 
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
+               void VerifyMaxNumberOfGroups ()
+               {
+                       if (dataflowBlockOptions.MaxNumberOfGroups == -1)
+                               return;
+
+                       bool shouldComplete;
+
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               shouldComplete = numberOfGroups >= dataflowBlockOptions.MaxNumberOfGroups;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit ();
+                       }
+
+                       if (shouldComplete)
+                               Complete ();
+               }
+
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// is not unbounded.
+               /// </summary>
+               bool TryAdd ()
+               {
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               if (numberOfGroups + batchCount / batchSize
+                                   >= dataflowBlockOptions.MaxNumberOfGroups)
+                                       return false;
+
+                               batchCount++;
+                               return true;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit ();
+                       }
+               }
+
                public void TriggerBatch ()
                {
-                       int earlyBatchSize;
-                       do {
-                               earlyBatchSize = batchCount;
-                               if (earlyBatchSize == 0)
-                                       return;
-                       } while (Interlocked.CompareExchange (ref batchCount, 0, earlyBatchSize) != earlyBatchSize);
+                       if (dataflowBlockOptions.Greedy) {
+                               int earlyBatchSize;
 
-                       MakeBatch (targets.Current, earlyBatchSize);
+                               bool lockTaken = false;
+                               try {
+                                       batchCountLock.Enter (ref lockTaken);
+                                       
+                                       if (batchCount == 0)
+                                               return;
+
+                                       earlyBatchSize = batchCount;
+                                       batchCount = 0;
+                                       numberOfGroups++;
+                               } finally {
+                                       if (lockTaken)
+                                               batchCountLock.Exit ();
+                               }
+
+                               MakeBatch (earlyBatchSize);
+                       } else {
+                               if (dataflowBlockOptions.BoundedCapacity == -1
+                                   || outgoing.Count <= dataflowBlockOptions.BoundedCapacity)
+                                       EnsureNonGreedyProcessing (true);
+                       }
                }
 
-               // TODO: there can be out-of-order processing of message elements if two collections
-               // are triggered and work side by side. See if it's a problem or not.
-               void BatchProcess ()
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
+               /// <param name="addedItems">
+               /// Number of newly added items. Used only with greedy processing.
+               /// </param>
+               void BatchProcess (int addedItems = 0)
                {
-                       ITargetBlock<T[]> target = targets.Current;
-                       int current = Interlocked.Increment (ref batchCount);
+                       if (dataflowBlockOptions.Greedy) {
+                               bool makeBatch = false;
 
-                       if (current % batchSize != 0)
-                               return;
+                               bool lockTaken = false;
+                               try {
+                                       batchCountLock.Enter (ref lockTaken);
 
-                       Interlocked.Add (ref batchCount, -current);
+                                       batchCount += addedItems;
+
+                                       if (batchCount >= batchSize) {
+                                               batchCount -= batchSize;
+                                               numberOfGroups++;
+                                               makeBatch = true;
+                                       }
+                               } finally {
+                                       if (lockTaken)
+                                               batchCountLock.Exit ();
+                               }
+
+                               if (makeBatch)
+                                       MakeBatch (batchSize);
+                       } else {
+                               if (ShouldProcessNonGreedy ())
+                                       EnsureNonGreedyProcessing (false);
+                       }
+               }
 
-                       MakeBatch (target, batchSize);
+               /// <summary>
+               /// Returns whether non-greedy creation of a batch should be started.
+               /// </summary>
+               bool ShouldProcessNonGreedy ()
+               {
+                       // do we have enough items waiting and would the new batch fit?
+                       return messageBox.PostponedMessagesCount >= batchSize
+                              && (dataflowBlockOptions.BoundedCapacity == -1
+                                  || outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
                }
 
-               void MakeBatch (ITargetBlock<T[]> target, int size)
+               /// <summary>
+               /// Creates a batch of the given size and adds the result to the output queue.
+               /// </summary>
+               void MakeBatch (int size)
                {
                        T[] batch = new T[size];
-                       for (int i = 0; i < size; ++i)
-                               messageQueue.TryTake (out batch[i]);
 
-                       if (target == null)
-                               outgoing.AddData (batch);
-                       else
-                               target.OfferMessage (headers.Increment (), batch, this, false);
+                       // lock is necessary here to make sure items are in the correct order
+                       bool taken = false;
+                       try {
+                               batchLock.Enter (ref taken);
+
+                               for (int i = 0; i < size; ++i)
+                                       messageQueue.TryTake (out batch [i]);
+                       } finally {
+                               if (taken)
+                                       batchLock.Exit ();
+                       }
+
+                       outgoing.AddData (batch);
+
+                       VerifyMaxNumberOfGroups ();
+               }
+
+               /// <summary>
+               /// Starts non-greedy creation of batches, if one doesn't already run.
+               /// </summary>
+               /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
+               void EnsureNonGreedyProcessing (bool manuallyTriggered)
+               {
+                       if (nonGreedyProcessing.TrySet ())
+                               Task.Factory.StartNew (() => NonGreedyProcess (manuallyTriggered),
+                                       dataflowBlockOptions.CancellationToken,
+                                       TaskCreationOptions.PreferFairness,
+                                       dataflowBlockOptions.TaskScheduler);
+               }
+
+               /// <summary>
+               /// Creates batches in non-greedy mode,
+               /// making sure the whole batch is available by using reservations.
+               /// </summary>
+               /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
+               void NonGreedyProcess (bool manuallyTriggered)
+               {
+                       bool first = true;
+
+                       do {
+                               var reservations =
+                                       new List<Tuple<ISourceBlock<T>, DataflowMessageHeader>> ();
+
+                               int expectedReservationsCount = messageBox.PostponedMessagesCount;
+
+                               if (expectedReservationsCount == 0)
+                                       break;
+
+                               bool gotReservation;
+                               do {
+                                       var reservation = messageBox.ReserveMessage ();
+                                       gotReservation = reservation != null;
+                                       if (gotReservation)
+                                               reservations.Add (reservation);
+                               } while (gotReservation && reservations.Count < batchSize);
 
-                       if (!outgoing.IsEmpty && targets.Current != null)
-                               outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
+                               int expectedSize = manuallyTriggered && first
+                                                          ? Math.Min (expectedReservationsCount, batchSize)
+                                                          : batchSize;
+
+                               if (reservations.Count < expectedSize) {
+                                       foreach (var reservation in reservations)
+                                               messageBox.RelaseReservation (reservation);
+
+                                       // some reservations failed, which most likely means the message
+                                       // was consumed by someone else and a new one will be offered soon;
+                                       // so postpone the batch, so that the other block has time to do that
+                                       // (MS .Net does something like this too)
+                                       if (manuallyTriggered && first) {
+                                               Task.Factory.StartNew (() => NonGreedyProcess (true),
+                                                       dataflowBlockOptions.CancellationToken,
+                                                       TaskCreationOptions.PreferFairness,
+                                                       dataflowBlockOptions.TaskScheduler);
+                                               return;
+                                       }
+                               } else {
+                                       T[] batch = new T[reservations.Count];
+
+                                       for (int i = 0; i < reservations.Count; i++)
+                                               batch [i] = messageBox.ConsumeReserved (reservations [i]);
+
+                                       outgoing.AddData (batch);
+
+                                       // non-greedy doesn't need lock
+                                       numberOfGroups++;
+
+                                       VerifyMaxNumberOfGroups ();
+                               }
+
+                               first = false;
+                       } while (ShouldProcessNonGreedy ());
+
+                       nonGreedyProcessing.Value = false;
+                       if (ShouldProcessNonGreedy ())
+                               EnsureNonGreedyProcessing (false);
                }
 
                public void Complete ()
                {
                        messageBox.Complete ();
+                       TriggerBatch ();
+                       outgoing.Complete ();
                }
 
-               public void Fault (Exception ex)
+               void IDataflowBlock.Fault (Exception exception)
                {
-                       compHelper.Fault (ex);
+                       compHelper.RequestFault (exception);
                }
 
                public Task Completion {
-                       get {
-                               return compHelper.Completion;
-                       }
+                       get { return compHelper.Completion; }
                }
 
                public int OutputCount {
-                       get {
-                               return outgoing.Count;
-                       }
+                       get { return outgoing.Count; }
+               }
+
+               public int BatchSize {
+                       get { return batchSize; }
                }
-       }
-}
 
+               public override string ToString ()
+               {
+                       return NameHelper.GetName (this, dataflowBlockOptions);
+               }
+       }
+}
\ No newline at end of file