1 // BatchedJoinBlock.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 using System.Collections.Generic;
25 namespace System.Threading.Tasks.Dataflow {
26 public sealed class BatchedJoinBlock<T1, T2, T3> :
27 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
28 readonly GroupingDataflowBlockOptions options;
30 readonly CompletionHelper completionHelper;
31 readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
34 readonly JoinTarget<T1> target1;
35 readonly JoinTarget<T2> target2;
36 readonly JoinTarget<T3> target3;
40 SpinLock batchCountLock;
42 public BatchedJoinBlock (int batchSize)
43 : this (batchSize, GroupingDataflowBlockOptions.Default)
47 public BatchedJoinBlock (int batchSize,
48 GroupingDataflowBlockOptions dataflowBlockOptions)
51 throw new ArgumentOutOfRangeException (
52 "batchSize", batchSize, "The batchSize must be positive.");
53 if (dataflowBlockOptions == null)
54 throw new ArgumentNullException ("dataflowBlockOptions");
55 if (!dataflowBlockOptions.Greedy)
56 throw new ArgumentException (
57 "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
58 if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
59 throw new ArgumentException (
60 "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
61 "dataflowBlockOptions");
63 BatchSize = batchSize;
64 options = dataflowBlockOptions;
65 completionHelper = CompletionHelper.GetNew (options);
67 target1 = new JoinTarget<T1> (
68 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
69 dataflowBlockOptions, true, TryAdd);
70 target2 = new JoinTarget<T2> (
71 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
72 dataflowBlockOptions, true, TryAdd);
73 target3 = new JoinTarget<T3> (
74 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
75 dataflowBlockOptions, true, TryAdd);
77 outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
78 this, completionHelper,
79 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
80 || target3.Buffer.IsCompleted,
83 target1.DecreaseCount ();
84 target2.DecreaseCount ();
85 target3.DecreaseCount ();
89 public int BatchSize { get; private set; }
91 public ITargetBlock<T1> Target1 {
92 get { return target1; }
95 public ITargetBlock<T2> Target2 {
96 get { return target2; }
99 public ITargetBlock<T3> Target3 {
100 get { return target3; }
104 /// Returns whether a new item can be accepted, and increments a counter if it can.
108 bool lockTaken = false;
110 batchCountLock.Enter (ref lockTaken);
112 if (options.MaxNumberOfGroups != -1
113 && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
120 batchCountLock.Exit ();
125 /// Decides whether to create a new batch or not.
129 bool lockTaken = false;
131 batchCountLock.Enter (ref lockTaken);
133 if (batchCount < BatchSize)
136 batchCount -= BatchSize;
140 batchCountLock.Exit ();
143 MakeBatch (BatchSize);
147 /// Creates a batch of the given size and adds the resulting batch to the output queue.
149 void MakeBatch (int batchSize)
154 var list1 = new List<T1> ();
155 var list2 = new List<T2> ();
156 var list3 = new List<T3> ();
158 // lock is necessary here to make sure items are in the correct order
161 batchLock.Enter (ref taken);
166 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
172 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
178 while (i < batchSize && target3.Buffer.TryTake (out item3)) {
184 throw new InvalidOperationException ("Unexpected count of items.");
190 var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
193 outgoing.AddData (batch);
195 VerifyMaxNumberOfGroups ();
199 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
200 /// has been reached. If it did, <see cref="Complete"/>s the block.
202 void VerifyMaxNumberOfGroups ()
204 if (options.MaxNumberOfGroups == -1)
209 bool lockTaken = false;
211 batchCountLock.Enter (ref lockTaken);
213 shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
216 batchCountLock.Exit ();
223 public Task Completion
225 get { return completionHelper.Completion; }
228 public void Complete ()
233 MakeBatch (batchCount);
234 outgoing.Complete ();
237 void IDataflowBlock.Fault (Exception exception)
239 completionHelper.RequestFault (exception);
242 Tuple<IList<T1>, IList<T2>, IList<T3>>
243 ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
244 DataflowMessageHeader messageHeader,
245 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
246 out bool messageConsumed)
248 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
251 public IDisposable LinkTo (
252 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
253 DataflowLinkOptions linkOptions)
255 return outgoing.AddTarget (target, linkOptions);
258 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
259 DataflowMessageHeader messageHeader,
260 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
262 outgoing.ReleaseReservation (messageHeader, target);
265 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
266 DataflowMessageHeader messageHeader,
267 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
269 return outgoing.ReserveMessage (messageHeader, target);
272 public bool TryReceive (
273 Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
274 out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
276 return outgoing.TryReceive (filter, out item);
279 public bool TryReceiveAll (
280 out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
282 return outgoing.TryReceiveAll (out items);
285 public int OutputCount {
286 get { return outgoing.Count; }
289 public override string ToString ()
291 return NameHelper.GetName (this, options);