3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
25 using System.Collections.Concurrent;
27 namespace System.Threading.Tasks.Dataflow {
28 public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]> {
29 readonly CompletionHelper compHelper;
30 readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
31 readonly MessageBox<T> messageBox;
32 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
33 readonly int batchSize;
36 SpinLock batchCountLock;
37 readonly OutgoingQueue<T[]> outgoing;
39 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
41 public BatchBlock (int batchSize) : this (batchSize, GroupingDataflowBlockOptions.Default)
45 public BatchBlock (int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
48 throw new ArgumentOutOfRangeException ("batchSize", batchSize,
49 "The batchSize must be positive.");
50 if (dataflowBlockOptions == null)
51 throw new ArgumentNullException ("dataflowBlockOptions");
52 if (dataflowBlockOptions.BoundedCapacity != -1
53 && batchSize > dataflowBlockOptions.BoundedCapacity)
54 throw new ArgumentOutOfRangeException ("batchSize",
55 "The batchSize must be smaller than the value of BoundedCapacity.");
57 this.batchSize = batchSize;
58 this.dataflowBlockOptions = dataflowBlockOptions;
59 this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
61 Action<bool> processQueue;
63 if (dataflowBlockOptions.MaxNumberOfGroups == -1) {
64 processQueue = newItem => BatchProcess (newItem ? 1 : 0);
67 processQueue = _ => BatchProcess ();
71 this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
72 () => outgoing.IsCompleted, processQueue, dataflowBlockOptions,
73 dataflowBlockOptions.Greedy, canAccept);
74 this.outgoing = new OutgoingQueue<T[]> (this, compHelper,
75 () => messageQueue.IsCompleted, messageBox.DecreaseCount,
76 dataflowBlockOptions, batch => batch.Length);
79 DataflowMessageStatus ITargetBlock<T>.OfferMessage (
80 DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
83 return messageBox.OfferMessage (
84 messageHeader, messageValue, source, consumeToAccept);
87 public IDisposable LinkTo (ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
89 return outgoing.AddTarget (target, linkOptions);
92 T[] ISourceBlock<T[]>.ConsumeMessage (
93 DataflowMessageHeader messageHeader, ITargetBlock<T[]> target,
94 out bool messageConsumed)
96 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
99 void ISourceBlock<T[]>.ReleaseReservation (
100 DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
102 outgoing.ReleaseReservation (messageHeader, target);
105 bool ISourceBlock<T[]>.ReserveMessage (
106 DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
108 return outgoing.ReserveMessage (messageHeader, target);
111 public bool TryReceive (Predicate<T[]> filter, out T[] item)
113 return outgoing.TryReceive (filter, out item);
116 public bool TryReceiveAll (out IList<T[]> items)
118 return outgoing.TryReceiveAll (out items);
122 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
123 /// has been reached. If it did, <see cref="Complete"/>s the block.
125 void VerifyMaxNumberOfGroups ()
127 if (dataflowBlockOptions.MaxNumberOfGroups == -1)
132 bool lockTaken = false;
134 batchCountLock.Enter (ref lockTaken);
136 shouldComplete = numberOfGroups >= dataflowBlockOptions.MaxNumberOfGroups;
139 batchCountLock.Exit ();
147 /// Returns whether a new item can be accepted, and increments a counter if it can.
148 /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
149 /// is not unbounded.
153 bool lockTaken = false;
155 batchCountLock.Enter (ref lockTaken);
157 if (numberOfGroups + batchCount / batchSize
158 >= dataflowBlockOptions.MaxNumberOfGroups)
165 batchCountLock.Exit ();
169 public void TriggerBatch ()
171 if (dataflowBlockOptions.Greedy) {
174 bool lockTaken = false;
176 batchCountLock.Enter (ref lockTaken);
181 earlyBatchSize = batchCount;
186 batchCountLock.Exit ();
189 MakeBatch (earlyBatchSize);
191 if (dataflowBlockOptions.BoundedCapacity == -1
192 || outgoing.Count <= dataflowBlockOptions.BoundedCapacity)
193 EnsureNonGreedyProcessing (true);
198 /// Decides whether to create a new batch or not.
200 /// <param name="addedItems">
201 /// Number of newly added items. Used only with greedy processing.
203 void BatchProcess (int addedItems = 0)
205 if (dataflowBlockOptions.Greedy) {
206 bool makeBatch = false;
208 bool lockTaken = false;
210 batchCountLock.Enter (ref lockTaken);
212 batchCount += addedItems;
214 if (batchCount >= batchSize) {
215 batchCount -= batchSize;
221 batchCountLock.Exit ();
225 MakeBatch (batchSize);
227 if (ShouldProcessNonGreedy ())
228 EnsureNonGreedyProcessing (false);
233 /// Returns whether non-greedy creation of a batch should be started.
235 bool ShouldProcessNonGreedy ()
237 // do we have enough items waiting and would the new batch fit?
238 return messageBox.PostponedMessagesCount >= batchSize
239 && (dataflowBlockOptions.BoundedCapacity == -1
240 || outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
244 /// Creates a batch of the given size and adds the resulting batch to the output queue.
246 void MakeBatch (int size)
248 T[] batch = new T[size];
250 // lock is necessary here to make sure items are in the correct order
253 batchLock.Enter (ref taken);
255 for (int i = 0; i < size; ++i)
256 messageQueue.TryTake (out batch [i]);
262 outgoing.AddData (batch);
264 VerifyMaxNumberOfGroups ();
268 /// Starts non-greedy creation of batches, if one doesn't already run.
270 /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
271 void EnsureNonGreedyProcessing (bool manuallyTriggered)
273 if (nonGreedyProcessing.TrySet ())
274 Task.Factory.StartNew (() => NonGreedyProcess (manuallyTriggered),
275 dataflowBlockOptions.CancellationToken,
276 TaskCreationOptions.PreferFairness,
277 dataflowBlockOptions.TaskScheduler);
281 /// Creates batches in non-greedy mode,
282 /// making sure the whole batch is available by using reservations.
284 /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
285 void NonGreedyProcess (bool manuallyTriggered)
291 new List<Tuple<ISourceBlock<T>, DataflowMessageHeader>> ();
293 int expectedReservationsCount = messageBox.PostponedMessagesCount;
295 if (expectedReservationsCount == 0)
300 var reservation = messageBox.ReserveMessage ();
301 gotReservation = reservation != null;
303 reservations.Add (reservation);
304 } while (gotReservation && reservations.Count < batchSize);
306 int expectedSize = manuallyTriggered && first
307 ? Math.Min (expectedReservationsCount, batchSize)
310 if (reservations.Count < expectedSize) {
311 foreach (var reservation in reservations)
312 messageBox.RelaseReservation (reservation);
314 // some reservations failed, which most likely means the message
315 // was consumed by someone else and a new one will be offered soon;
316 // so postpone the batch, so that the other block has time to do that
317 // (MS .Net does something like this too)
318 if (manuallyTriggered && first) {
319 Task.Factory.StartNew (() => NonGreedyProcess (true),
320 dataflowBlockOptions.CancellationToken,
321 TaskCreationOptions.PreferFairness,
322 dataflowBlockOptions.TaskScheduler);
326 T[] batch = new T[reservations.Count];
328 for (int i = 0; i < reservations.Count; i++)
329 batch [i] = messageBox.ConsumeReserved (reservations [i]);
331 outgoing.AddData (batch);
333 // non-greedy doesn't need lock
336 VerifyMaxNumberOfGroups ();
340 } while (ShouldProcessNonGreedy ());
342 nonGreedyProcessing.Value = false;
343 if (ShouldProcessNonGreedy ())
344 EnsureNonGreedyProcessing (false);
347 public void Complete ()
349 messageBox.Complete ();
351 outgoing.Complete ();
354 void IDataflowBlock.Fault (Exception exception)
356 compHelper.RequestFault (exception);
359 public Task Completion {
360 get { return compHelper.Completion; }
363 public int OutputCount {
364 get { return outgoing.Count; }
367 public int BatchSize {
368 get { return batchSize; }
371 public override string ToString ()
373 return NameHelper.GetName (this, dataflowBlockOptions);