Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock`3.cs
1 // BatchedJoinBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System.Collections.Generic;
24
25 namespace System.Threading.Tasks.Dataflow {
26         public sealed class BatchedJoinBlock<T1, T2, T3> :
27                 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
28                 readonly GroupingDataflowBlockOptions options;
29
30                 readonly CompletionHelper completionHelper;
31                 readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
32                 SpinLock batchLock;
33
34                 readonly JoinTarget<T1> target1;
35                 readonly JoinTarget<T2> target2;
36                 readonly JoinTarget<T3> target3;
37
38                 int batchCount;
39                 long numberOfGroups;
40                 SpinLock batchCountLock;
41
42                 public BatchedJoinBlock (int batchSize)
43                         : this (batchSize, GroupingDataflowBlockOptions.Default)
44                 {
45                 }
46
47                 public BatchedJoinBlock (int batchSize,
48                                          GroupingDataflowBlockOptions dataflowBlockOptions)
49                 {
50                         if (batchSize <= 0)
51                                 throw new ArgumentOutOfRangeException (
52                                         "batchSize", batchSize, "The batchSize must be positive.");
53                         if (dataflowBlockOptions == null)
54                                 throw new ArgumentNullException ("dataflowBlockOptions");
55                         if (!dataflowBlockOptions.Greedy)
56                                 throw new ArgumentException (
57                                         "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
58                         if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
59                                 throw new ArgumentException (
60                                         "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
61                                         "dataflowBlockOptions");
62
63                         BatchSize = batchSize;
64                         options = dataflowBlockOptions;
65                         completionHelper = CompletionHelper.GetNew (options);
66
67                         target1 = new JoinTarget<T1> (
68                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
69                                 dataflowBlockOptions, true, TryAdd);
70                         target2 = new JoinTarget<T2> (
71                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
72                                 dataflowBlockOptions, true, TryAdd);
73                         target3 = new JoinTarget<T3> (
74                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
75                                 dataflowBlockOptions, true, TryAdd);
76
77                         outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
78                                 this, completionHelper,
79                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
80                                       || target3.Buffer.IsCompleted,
81                                 _ =>
82                                 {
83                                         target1.DecreaseCount ();
84                                         target2.DecreaseCount ();
85                                         target3.DecreaseCount ();
86                                 }, options);
87                 }
88
89                 public int BatchSize { get; private set; }
90
91                 public ITargetBlock<T1> Target1 {
92                         get { return target1; }
93                 }
94
95                 public ITargetBlock<T2> Target2 {
96                         get { return target2; }
97                 }
98
99                 public ITargetBlock<T3> Target3 {
100                         get { return target3; }
101                 }
102
103                 /// <summary>
104                 /// Returns whether a new item can be accepted, and increments a counter if it can.
105                 /// </summary>
106                 bool TryAdd ()
107                 {
108                         bool lockTaken = false;
109                         try {
110                                 batchCountLock.Enter (ref lockTaken);
111
112                                 if (options.MaxNumberOfGroups != -1
113                                     && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
114                                         return false;
115
116                                 batchCount++;
117                                 return true;
118                         } finally {
119                                 if (lockTaken)
120                                         batchCountLock.Exit ();
121                         }
122                 }
123
124                 /// <summary>
125                 /// Decides whether to create a new batch or not.
126                 /// </summary>
127                 void SignalTarget ()
128                 {
129                         bool lockTaken = false;
130                         try {
131                                 batchCountLock.Enter (ref lockTaken);
132
133                                 if (batchCount < BatchSize)
134                                         return;
135
136                                 batchCount -= BatchSize;
137                                 numberOfGroups++;
138                         } finally {
139                                 if (lockTaken)
140                                         batchCountLock.Exit ();
141                         }
142
143                         MakeBatch (BatchSize);
144                 }
145
146                 /// <summary>
147                 /// Creates a batch of the given size and adds the resulting batch to the output queue.
148                 /// </summary>
149                 void MakeBatch (int batchSize)
150                 {
151                         if (batchSize == 0)
152                                 return;
153
154                         var list1 = new List<T1> ();
155                         var list2 = new List<T2> ();
156                         var list3 = new List<T3> ();
157
158                         // lock is necessary here to make sure items are in the correct order
159                         bool taken = false;
160                         try {
161                                 batchLock.Enter (ref taken);
162
163                                 int i = 0;
164
165                                 T1 item1;
166                                 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
167                                         list1.Add (item1);
168                                         i++;
169                                 }
170
171                                 T2 item2;
172                                 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
173                                         list2.Add (item2);
174                                         i++;
175                                 }
176
177                                 T3 item3;
178                                 while (i < batchSize && target3.Buffer.TryTake (out item3)) {
179                                         list3.Add (item3);
180                                         i++;
181                                 }
182
183                                 if (i < batchSize)
184                                         throw new InvalidOperationException ("Unexpected count of items.");
185                         } finally {
186                                 if (taken)
187                                         batchLock.Exit ();
188                         }
189
190                         var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
191                                 list3);
192
193                         outgoing.AddData (batch);
194
195                         VerifyMaxNumberOfGroups ();
196                 }
197
198                 /// <summary>
199                 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
200                 /// has been reached. If it did, <see cref="Complete"/>s the block.
201                 /// </summary>
202                 void VerifyMaxNumberOfGroups ()
203                 {
204                         if (options.MaxNumberOfGroups == -1)
205                                 return;
206
207                         bool shouldComplete;
208
209                         bool lockTaken = false;
210                         try {
211                                 batchCountLock.Enter (ref lockTaken);
212
213                                 shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
214                         } finally {
215                                 if (lockTaken)
216                                         batchCountLock.Exit ();
217                         }
218
219                         if (shouldComplete)
220                                 Complete ();
221                 }
222
223                 public Task Completion
224                 {
225                         get { return completionHelper.Completion; }
226                 }
227
228                 public void Complete ()
229                 {
230                         target1.Complete ();
231                         target2.Complete ();
232                         target3.Complete ();
233                         MakeBatch (batchCount);
234                         outgoing.Complete ();
235                 }
236
237                 void IDataflowBlock.Fault (Exception exception)
238                 {
239                         completionHelper.RequestFault (exception);
240                 }
241
242                 Tuple<IList<T1>, IList<T2>, IList<T3>>
243                         ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
244                         DataflowMessageHeader messageHeader,
245                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
246                         out bool messageConsumed)
247                 {
248                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
249                 }
250
251                 public IDisposable LinkTo (
252                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
253                         DataflowLinkOptions linkOptions)
254                 {
255                         return outgoing.AddTarget (target, linkOptions);
256                 }
257
258                 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
259                         DataflowMessageHeader messageHeader,
260                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
261                 {
262                         outgoing.ReleaseReservation (messageHeader, target);
263                 }
264
265                 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
266                         DataflowMessageHeader messageHeader,
267                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
268                 {
269                         return outgoing.ReserveMessage (messageHeader, target);
270                 }
271
272                 public bool TryReceive (
273                         Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
274                         out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
275                 {
276                         return outgoing.TryReceive (filter, out item);
277                 }
278
279                 public bool TryReceiveAll (
280                         out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
281                 {
282                         return outgoing.TryReceiveAll (out items);
283                 }
284
285                 public int OutputCount {
286                         get { return outgoing.Count; }
287                 }
288
289                 public override string ToString ()
290                 {
291                         return NameHelper.GetName (this, options);
292                 }
293         }
294 }