Started adding documentation for non-public types and methods
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchBlock.cs
1 // BatchBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Generic;
25 using System.Collections.Concurrent;
26
27 namespace System.Threading.Tasks.Dataflow {
28         public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]> {
29                 readonly CompletionHelper compHelper;
30                 readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
31                 readonly MessageBox<T> messageBox;
32                 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
33                 readonly int batchSize;
34                 int batchCount;
35                 long numberOfGroups;
36                 SpinLock batchCountLock;
37                 readonly OutgoingQueue<T[]> outgoing;
38                 SpinLock batchLock;
39                 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
40
41                 public BatchBlock (int batchSize) : this (batchSize, GroupingDataflowBlockOptions.Default)
42                 {
43                 }
44
45                 public BatchBlock (int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
46                 {
47                         if (batchSize <= 0)
48                                 throw new ArgumentOutOfRangeException ("batchSize", batchSize,
49                                         "The batchSize must be positive.");
50                         if (dataflowBlockOptions == null)
51                                 throw new ArgumentNullException ("dataflowBlockOptions");
52                         if (dataflowBlockOptions.BoundedCapacity != -1
53                             && batchSize > dataflowBlockOptions.BoundedCapacity)
54                                 throw new ArgumentOutOfRangeException ("batchSize",
55                                         "The batchSize must be smaller than the value of BoundedCapacity.");
56
57                         this.batchSize = batchSize;
58                         this.dataflowBlockOptions = dataflowBlockOptions;
59                         this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
60
61                         Action<bool> processQueue;
62                         Func<bool> canAccept;
63                         if (dataflowBlockOptions.MaxNumberOfGroups == -1) {
64                                 processQueue = newItem => BatchProcess (newItem ? 1 : 0);
65                                 canAccept = null;
66                         } else {
67                                 processQueue = _ => BatchProcess ();
68                                 canAccept = TryAdd;
69                         }
70
71                         this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
72                                 () => outgoing.IsCompleted, processQueue, dataflowBlockOptions,
73                                 dataflowBlockOptions.Greedy, canAccept);
74                         this.outgoing = new OutgoingQueue<T[]> (this, compHelper,
75                                 () => messageQueue.IsCompleted, messageBox.DecreaseCount,
76                                 dataflowBlockOptions, batch => batch.Length);
77                 }
78
79                 DataflowMessageStatus ITargetBlock<T>.OfferMessage (
80                         DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
81                         bool consumeToAccept)
82                 {
83                         return messageBox.OfferMessage (
84                                 messageHeader, messageValue, source, consumeToAccept);
85                 }
86
87                 public IDisposable LinkTo (ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
88                 {
89                         return outgoing.AddTarget (target, linkOptions);
90                 }
91
92                 T[] ISourceBlock<T[]>.ConsumeMessage (
93                         DataflowMessageHeader messageHeader, ITargetBlock<T[]> target,
94                         out bool messageConsumed)
95                 {
96                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
97                 }
98
99                 void ISourceBlock<T[]>.ReleaseReservation (
100                         DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
101                 {
102                         outgoing.ReleaseReservation (messageHeader, target);
103                 }
104
105                 bool ISourceBlock<T[]>.ReserveMessage (
106                         DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
107                 {
108                         return outgoing.ReserveMessage (messageHeader, target);
109                 }
110
111                 public bool TryReceive (Predicate<T[]> filter, out T[] item)
112                 {
113                         return outgoing.TryReceive (filter, out item);
114                 }
115
116                 public bool TryReceiveAll (out IList<T[]> items)
117                 {
118                         return outgoing.TryReceiveAll (out items);
119                 }
120
121                 /// <summary>
122                 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
123                 /// has been reached. If it did, <see cref="Complete"/>s the block.
124                 /// </summary>
125                 void VerifyMaxNumberOfGroups ()
126                 {
127                         if (dataflowBlockOptions.MaxNumberOfGroups == -1)
128                                 return;
129
130                         bool shouldComplete;
131
132                         bool lockTaken = false;
133                         try {
134                                 batchCountLock.Enter (ref lockTaken);
135
136                                 shouldComplete = numberOfGroups >= dataflowBlockOptions.MaxNumberOfGroups;
137                         } finally {
138                                 if (lockTaken)
139                                         batchCountLock.Exit ();
140                         }
141
142                         if (shouldComplete)
143                                 Complete ();
144                 }
145
146                 /// <summary>
147                 /// Returns whether a new item can be accepted, and increments a counter if it can.
148                 /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
149                 /// is not unbounded.
150                 /// </summary>
151                 bool TryAdd ()
152                 {
153                         bool lockTaken = false;
154                         try {
155                                 batchCountLock.Enter (ref lockTaken);
156
157                                 if (numberOfGroups + batchCount / batchSize
158                                     >= dataflowBlockOptions.MaxNumberOfGroups)
159                                         return false;
160
161                                 batchCount++;
162                                 return true;
163                         } finally {
164                                 if (lockTaken)
165                                         batchCountLock.Exit ();
166                         }
167                 }
168
169                 public void TriggerBatch ()
170                 {
171                         if (dataflowBlockOptions.Greedy) {
172                                 int earlyBatchSize;
173
174                                 bool lockTaken = false;
175                                 try {
176                                         batchCountLock.Enter (ref lockTaken);
177                                         
178                                         if (batchCount == 0)
179                                                 return;
180
181                                         earlyBatchSize = batchCount;
182                                         batchCount = 0;
183                                         numberOfGroups++;
184                                 } finally {
185                                         if (lockTaken)
186                                                 batchCountLock.Exit ();
187                                 }
188
189                                 MakeBatch (earlyBatchSize);
190                         } else {
191                                 if (dataflowBlockOptions.BoundedCapacity == -1
192                                     || outgoing.Count <= dataflowBlockOptions.BoundedCapacity)
193                                         EnsureNonGreedyProcessing (true);
194                         }
195                 }
196
197                 /// <summary>
198                 /// Decides whether to create a new batch or not.
199                 /// </summary>
200                 /// <param name="addedItems">
201                 /// Number of newly added items. Used only with greedy processing.
202                 /// </param>
203                 void BatchProcess (int addedItems = 0)
204                 {
205                         if (dataflowBlockOptions.Greedy) {
206                                 bool makeBatch = false;
207
208                                 bool lockTaken = false;
209                                 try {
210                                         batchCountLock.Enter (ref lockTaken);
211
212                                         batchCount += addedItems;
213
214                                         if (batchCount >= batchSize) {
215                                                 batchCount -= batchSize;
216                                                 numberOfGroups++;
217                                                 makeBatch = true;
218                                         }
219                                 } finally {
220                                         if (lockTaken)
221                                                 batchCountLock.Exit ();
222                                 }
223
224                                 if (makeBatch)
225                                         MakeBatch (batchSize);
226                         } else {
227                                 if (ShouldProcessNonGreedy ())
228                                         EnsureNonGreedyProcessing (false);
229                         }
230                 }
231
232                 /// <summary>
233                 /// Returns whether non-greedy creation of a batch should be started.
234                 /// </summary>
235                 bool ShouldProcessNonGreedy ()
236                 {
237                         // do we have enough items waiting and would the new batch fit?
238                         return messageBox.PostponedMessagesCount >= batchSize
239                                && (dataflowBlockOptions.BoundedCapacity == -1
240                                    || outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
241                 }
242
243                 /// <summary>
244                 /// Creates a batch of the given size and adds the resulting batch to the output queue.
245                 /// </summary>
246                 void MakeBatch (int size)
247                 {
248                         T[] batch = new T[size];
249
250                         // lock is necessary here to make sure items are in the correct order
251                         bool taken = false;
252                         try {
253                                 batchLock.Enter (ref taken);
254
255                                 for (int i = 0; i < size; ++i)
256                                         messageQueue.TryTake (out batch [i]);
257                         } finally {
258                                 if (taken)
259                                         batchLock.Exit ();
260                         }
261
262                         outgoing.AddData (batch);
263
264                         VerifyMaxNumberOfGroups ();
265                 }
266
267                 /// <summary>
268                 /// Starts non-greedy creation of batches, if one doesn't already run.
269                 /// </summary>
270                 /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
271                 void EnsureNonGreedyProcessing (bool manuallyTriggered)
272                 {
273                         if (nonGreedyProcessing.TrySet ())
274                                 Task.Factory.StartNew (() => NonGreedyProcess (manuallyTriggered),
275                                         dataflowBlockOptions.CancellationToken,
276                                         TaskCreationOptions.PreferFairness,
277                                         dataflowBlockOptions.TaskScheduler);
278                 }
279
280                 /// <summary>
281                 /// Creates batches in non-greedy mode,
282                 /// making sure the whole batch is available by using reservations.
283                 /// </summary>
284                 /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
285                 void NonGreedyProcess (bool manuallyTriggered)
286                 {
287                         bool first = true;
288
289                         do {
290                                 var reservations =
291                                         new List<Tuple<ISourceBlock<T>, DataflowMessageHeader>> ();
292
293                                 int expectedReservationsCount = messageBox.PostponedMessagesCount;
294
295                                 if (expectedReservationsCount == 0)
296                                         break;
297
298                                 bool gotReservation;
299                                 do {
300                                         var reservation = messageBox.ReserveMessage ();
301                                         gotReservation = reservation != null;
302                                         if (gotReservation)
303                                                 reservations.Add (reservation);
304                                 } while (gotReservation && reservations.Count < batchSize);
305
306                                 int expectedSize = manuallyTriggered && first
307                                                            ? Math.Min (expectedReservationsCount, batchSize)
308                                                            : batchSize;
309
310                                 if (reservations.Count < expectedSize) {
311                                         foreach (var reservation in reservations)
312                                                 messageBox.RelaseReservation (reservation);
313
314                                         // some reservations failed, which most likely means the message
315                                         // was consumed by someone else and a new one will be offered soon;
316                                         // so postpone the batch, so that the other block has time to do that
317                                         // (MS .Net does something like this too)
318                                         if (manuallyTriggered && first) {
319                                                 Task.Factory.StartNew (() => NonGreedyProcess (true),
320                                                         dataflowBlockOptions.CancellationToken,
321                                                         TaskCreationOptions.PreferFairness,
322                                                         dataflowBlockOptions.TaskScheduler);
323                                                 return;
324                                         }
325                                 } else {
326                                         T[] batch = new T[reservations.Count];
327
328                                         for (int i = 0; i < reservations.Count; i++)
329                                                 batch [i] = messageBox.ConsumeReserved (reservations [i]);
330
331                                         outgoing.AddData (batch);
332
333                                         // non-greedy doesn't need lock
334                                         numberOfGroups++;
335
336                                         VerifyMaxNumberOfGroups ();
337                                 }
338
339                                 first = false;
340                         } while (ShouldProcessNonGreedy ());
341
342                         nonGreedyProcessing.Value = false;
343                         if (ShouldProcessNonGreedy ())
344                                 EnsureNonGreedyProcessing (false);
345                 }
346
347                 public void Complete ()
348                 {
349                         messageBox.Complete ();
350                         TriggerBatch ();
351                         outgoing.Complete ();
352                 }
353
354                 void IDataflowBlock.Fault (Exception exception)
355                 {
356                         compHelper.RequestFault (exception);
357                 }
358
359                 public Task Completion {
360                         get { return compHelper.Completion; }
361                 }
362
363                 public int OutputCount {
364                         get { return outgoing.Count; }
365                 }
366
367                 public int BatchSize {
368                         get { return batchSize; }
369                 }
370
371                 public override string ToString ()
372                 {
373                         return NameHelper.GetName (this, dataflowBlockOptions);
374                 }
375         }
376 }