Merge pull request #485 from mtausig/master
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock.cs
1 // BatchedJoinBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System.Collections.Generic;
24
25 namespace System.Threading.Tasks.Dataflow {
26         public sealed class BatchedJoinBlock<T1, T2> :
27                 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
28                 readonly GroupingDataflowBlockOptions options;
29
30                 readonly CompletionHelper completionHelper;
31                 readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
32                 SpinLock batchLock;
33
34                 readonly JoinTarget<T1> target1;
35                 readonly JoinTarget<T2> target2;
36
37                 int batchCount;
38                 long numberOfGroups;
39                 SpinLock batchCountLock;
40
41                 public BatchedJoinBlock (int batchSize)
42                         : this (batchSize, GroupingDataflowBlockOptions.Default)
43                 {
44                 }
45
46                 public BatchedJoinBlock (int batchSize,
47                                          GroupingDataflowBlockOptions dataflowBlockOptions)
48                 {
49                         if (batchSize <= 0)
50                                 throw new ArgumentOutOfRangeException (
51                                         "batchSize", batchSize, "The batchSize must be positive.");
52                         if (dataflowBlockOptions == null)
53                                 throw new ArgumentNullException ("dataflowBlockOptions");
54                         if (!dataflowBlockOptions.Greedy)
55                                 throw new ArgumentException (
56                                         "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
57                         if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
58                                 throw new ArgumentException (
59                                         "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
60                                         "dataflowBlockOptions");
61
62                         BatchSize = batchSize;
63                         options = dataflowBlockOptions;
64                         completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
65
66                         target1 = new JoinTarget<T1> (
67                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
68                                 dataflowBlockOptions, true, TryAdd);
69                         target2 = new JoinTarget<T2> (
70                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
71                                 dataflowBlockOptions, true, TryAdd);
72
73                         outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
74                                 this, completionHelper,
75                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
76                                 _ =>
77                                 {
78                                         target1.DecreaseCount ();
79                                         target2.DecreaseCount ();
80                                 }, options);
81                 }
82
83                 public int BatchSize { get; private set; }
84
85                 public ITargetBlock<T1> Target1 {
86                         get { return target1; }
87                 }
88
89                 public ITargetBlock<T2> Target2 {
90                         get { return target2; }
91                 }
92
93                 /// <summary>
94                 /// Returns whether a new item can be accepted, and increments a counter if it can.
95                 /// </summary>
96                 bool TryAdd ()
97                 {
98                         bool lockTaken = false;
99                         try {
100                                 batchCountLock.Enter (ref lockTaken);
101
102                                 if (options.MaxNumberOfGroups != -1
103                                     && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
104                                         return false;
105
106                                 batchCount++;
107                                 return true;
108                         } finally {
109                                 if (lockTaken)
110                                         batchCountLock.Exit();
111                         }
112                 }
113
114                 /// <summary>
115                 /// Decides whether to create a new batch or not.
116                 /// </summary>
117                 void SignalTarget ()
118                 {
119                         bool lockTaken = false;
120                         try {
121                                 batchCountLock.Enter (ref lockTaken);
122
123                                 if (batchCount < BatchSize)
124                                         return;
125
126                                 batchCount -= BatchSize;
127                                 numberOfGroups++;
128                         } finally {
129                                 if (lockTaken)
130                                         batchCountLock.Exit();
131                         }
132
133                         MakeBatch (BatchSize);
134                 }
135
136                 /// <summary>
137                 /// Creates a batch of the given size and adds the resulting batch to the output queue.
138                 /// </summary>
139                 void MakeBatch (int batchSize)
140                 {
141                         if (batchSize == 0)
142                                 return;
143
144                         var list1 = new List<T1> ();
145                         var list2 = new List<T2> ();
146                         
147                         // lock is necessary here to make sure items are in the correct order
148                         bool taken = false;
149                         try {
150                                 batchLock.Enter (ref taken);
151
152                                 int i = 0;
153
154                                 T1 item1;
155                                 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
156                                         list1.Add (item1);
157                                         i++;
158                                 }
159
160                                 T2 item2;
161                                 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
162                                         list2.Add (item2);
163                                         i++;
164                                 }
165
166                                 if (i < batchSize)
167                                         throw new InvalidOperationException("Unexpected count of items.");
168                         } finally {
169                                 if (taken)
170                                         batchLock.Exit ();
171                         }
172
173                         var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
174
175                         outgoing.AddData (batch);
176
177                         VerifyMaxNumberOfGroups ();
178                 }
179
180                 /// <summary>
181                 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
182                 /// has been reached. If it did, <see cref="Complete"/>s the block.
183                 /// </summary>
184                 void VerifyMaxNumberOfGroups ()
185                 {
186                         if (options.MaxNumberOfGroups == -1)
187                                 return;
188
189                         bool shouldComplete;
190
191                         bool lockTaken = false;
192                         try {
193                                 batchCountLock.Enter (ref lockTaken);
194
195                                 shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
196                         } finally {
197                                 if (lockTaken)
198                                         batchCountLock.Exit ();
199                         }
200
201                         if (shouldComplete)
202                                 Complete ();
203                 }
204
205                 public Task Completion {
206                         get { return completionHelper.Completion; }
207                 }
208
209                 public void Complete ()
210                 {
211                         target1.Complete ();
212                         target2.Complete ();
213                         MakeBatch (batchCount);
214                         outgoing.Complete ();
215                 }
216
217                 void IDataflowBlock.Fault (Exception exception)
218                 {
219                         completionHelper.RequestFault (exception);
220                 }
221
222                 Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
223                         DataflowMessageHeader messageHeader,
224                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
225                         out bool messageConsumed)
226                 {
227                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
228                 }
229
230                 public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
231                                            DataflowLinkOptions linkOptions)
232                 {
233                         return outgoing.AddTarget(target, linkOptions);
234                 }
235
236                 void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
237                         DataflowMessageHeader messageHeader,
238                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
239                 {
240                         outgoing.ReleaseReservation (messageHeader, target);
241                 }
242
243                 bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
244                         DataflowMessageHeader messageHeader,
245                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
246                 {
247                         return outgoing.ReserveMessage (messageHeader, target);
248                 }
249
250                 public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
251                                         out Tuple<IList<T1>, IList<T2>> item)
252                 {
253                         return outgoing.TryReceive (filter, out item);
254                 }
255
256                 public bool TryReceiveAll (out IList<Tuple<IList<T1>, IList<T2>>> items)
257                 {
258                         return outgoing.TryReceiveAll (out items);
259                 }
260
261                 public int OutputCount {
262                         get { return outgoing.Count; }
263                 }
264
265                 public override string ToString ()
266                 {
267                         return NameHelper.GetName (this, options);
268                 }
269         }
270 }