1 // BatchedJoinBlock.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 using System.Collections.Generic;
27 namespace System.Threading.Tasks.Dataflow {
28 public sealed class BatchedJoinBlock<T1, T2> :
29 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
30 GroupingDataflowBlockOptions options;
32 CompletionHelper completionHelper;
33 readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
34 DataflowMessageHeader headers;
37 readonly JoinTarget<T1> target1;
38 readonly JoinTarget<T2> target2;
42 public BatchedJoinBlock (int batchSize)
43 : this (batchSize, GroupingDataflowBlockOptions.Default)
47 public BatchedJoinBlock (int batchSize,
48 GroupingDataflowBlockOptions dataflowBlockOptions)
51 throw new ArgumentOutOfRangeException (
52 "batchSize", batchSize, "The batchSize must be positive.");
53 if (dataflowBlockOptions == null)
54 throw new ArgumentNullException ("dataflowBlockOptions");
56 BatchSize = batchSize;
57 options = dataflowBlockOptions;
58 completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
60 target1 = new JoinTarget<T1> (
61 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
62 target2 = new JoinTarget<T2> (
63 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
65 outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
66 this, completionHelper,
67 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted, options);
70 public int BatchSize { get; private set; }
72 public ITargetBlock<T1> Target1 {
73 get { return target1; }
76 public ITargetBlock<T2> Target2 {
77 get { return target2; }
80 private void SignalTarget()
82 int current = Interlocked.Increment (ref batchCount);
84 if (current % BatchSize != 0)
87 Interlocked.Add (ref batchCount, -current);
89 MakeBatch (BatchSize);
92 void MakeBatch (int batchSize)
94 var list1 = new List<T1> ();
95 var list2 = new List<T2> ();
97 // lock is necessary here to make sure items are in the correct order
100 batchLock.Enter (ref taken);
105 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
111 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
117 throw new InvalidOperationException("Unexpected count of items.");
123 var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
125 outgoing.AddData (batch);
128 public Task Completion {
129 get { return completionHelper.Completion; }
132 public void Complete ()
134 outgoing.Complete ();
137 void IDataflowBlock.Fault (Exception exception)
139 completionHelper.RequestFault (exception);
142 Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
143 DataflowMessageHeader messageHeader,
144 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
145 out bool messageConsumed)
147 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
150 public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
151 DataflowLinkOptions linkOptions)
153 return outgoing.AddTarget(target, linkOptions);
156 void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
157 DataflowMessageHeader messageHeader,
158 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
160 outgoing.ReleaseReservation (messageHeader, target);
163 bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
164 DataflowMessageHeader messageHeader,
165 ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
167 return outgoing.ReserveMessage (messageHeader, target);
170 public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
171 out Tuple<IList<T1>, IList<T2>> item)
173 return outgoing.TryReceive (filter, out item);
176 public bool TryReceiveAll (out IList<Tuple<IList<T1>, IList<T2>>> items)
178 return outgoing.TryReceiveAll (out items);
181 public override string ToString ()
183 return NameHelper.GetName (this, options);