1 // BatchedJoinBlock.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 using System.Collections.Generic;
25 namespace System.Threading.Tasks.Dataflow {
26 public sealed class BatchedJoinBlock<T1, T2> :
27 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
28 readonly GroupingDataflowBlockOptions options;
30 readonly CompletionHelper completionHelper;
31 readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
34 readonly JoinTarget<T1> target1;
35 readonly JoinTarget<T2> target2;
39 SpinLock batchCountLock;
41 public BatchedJoinBlock (int batchSize)
42 : this (batchSize, GroupingDataflowBlockOptions.Default)
46 public BatchedJoinBlock (int batchSize,
47 GroupingDataflowBlockOptions dataflowBlockOptions)
50 throw new ArgumentOutOfRangeException (
51 "batchSize", batchSize, "The batchSize must be positive.");
52 if (dataflowBlockOptions == null)
53 throw new ArgumentNullException ("dataflowBlockOptions");
54 if (!dataflowBlockOptions.Greedy)
55 throw new ArgumentException (
56 "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
57 if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
58 throw new ArgumentException (
59 "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
60 "dataflowBlockOptions");
62 BatchSize = batchSize;
63 options = dataflowBlockOptions;
64 completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
66 target1 = new JoinTarget<T1> (
67 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
68 dataflowBlockOptions, true, TryAdd);
69 target2 = new JoinTarget<T2> (
70 this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
71 dataflowBlockOptions, true, TryAdd);
73 outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
74 this, completionHelper,
75 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
78 target1.DecreaseCount ();
79 target2.DecreaseCount ();
83 public int BatchSize { get; private set; }
85 public ITargetBlock<T1> Target1 {
86 get { return target1; }
89 public ITargetBlock<T2> Target2 {
90 get { return target2; }
94 /// Returns whether a new item can be accepted, and increments a counter if it can.
98 bool lockTaken = false;
100 batchCountLock.Enter (ref lockTaken);
102 if (options.MaxNumberOfGroups != -1
103 && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
110 batchCountLock.Exit();
115 /// Decides whether to create a new batch or not.
119 bool lockTaken = false;
121 batchCountLock.Enter (ref lockTaken);
123 if (batchCount < BatchSize)
126 batchCount -= BatchSize;
130 batchCountLock.Exit();
133 MakeBatch (BatchSize);
137 /// Creates a batch of the given size and adds the resulting batch to the output queue.
139 void MakeBatch (int batchSize)
144 var list1 = new List<T1> ();
145 var list2 = new List<T2> ();
147 // lock is necessary here to make sure items are in the correct order
150 batchLock.Enter (ref taken);
155 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
161 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
167 throw new InvalidOperationException("Unexpected count of items.");
173 var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
175 outgoing.AddData (batch);
177 VerifyMaxNumberOfGroups ();
181 /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
182 /// has been reached. If it did, <see cref="Complete"/>s the block.
184 void VerifyMaxNumberOfGroups ()
186 if (options.MaxNumberOfGroups == -1)
191 bool lockTaken = false;
193 batchCountLock.Enter (ref lockTaken);
195 shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
198 batchCountLock.Exit ();
205 public Task Completion {
206 get { return completionHelper.Completion; }
209 public void Complete ()
213 MakeBatch (batchCount);
214 outgoing.Complete ();
217 void IDataflowBlock.Fault (Exception exception)
219 completionHelper.RequestFault (exception);
222 Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
223 DataflowMessageHeader messageHeader,
224 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
225 out bool messageConsumed)
227 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
230 public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
231 DataflowLinkOptions linkOptions)
233 return outgoing.AddTarget(target, linkOptions);
236 void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
237 DataflowMessageHeader messageHeader,
238 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
240 outgoing.ReleaseReservation (messageHeader, target);
243 bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
244 DataflowMessageHeader messageHeader,
245 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
247 return outgoing.ReserveMessage (messageHeader, target);
250 public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
251 out Tuple<IList<T1>, IList<T2>> item)
253 return outgoing.TryReceive (filter, out item);
256 public bool TryReceiveAll (out IList<Tuple<IList<T1>, IList<T2>>> items)
258 return outgoing.TryReceiveAll (out items);
261 public int OutputCount {
262 get { return outgoing.Count; }
265 public override string ToString ()
267 return NameHelper.GetName (this, options);