1 // BatchedJoinBlock.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 using System.Collections.Generic;
27 namespace System.Threading.Tasks.Dataflow {
28 public sealed class BatchedJoinBlock<T1, T2, T3> :
29 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
30 readonly GroupingDataflowBlockOptions options;
32 CompletionHelper completionHelper;
33 readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
35 readonly MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> vault =
36 new MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
38 readonly TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> targets =
39 new TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
41 DataflowMessageHeader headers;
44 readonly JoinTarget<T1> target1;
45 readonly JoinTarget<T2> target2;
46 readonly JoinTarget<T3> target3;
50 public BatchedJoinBlock (int batchSize)
51 : this (batchSize, GroupingDataflowBlockOptions.Default)
55 public BatchedJoinBlock (int batchSize,
56 GroupingDataflowBlockOptions dataflowBlockOptions)
59 throw new ArgumentOutOfRangeException (
60 "batchSize", batchSize, "The batchSize must be positive.");
61 if (dataflowBlockOptions == null)
62 throw new ArgumentNullException ("dataflowBlockOptions");
64 BatchSize = batchSize;
65 options = dataflowBlockOptions;
66 completionHelper = CompletionHelper.GetNew (options);
68 target1 = new JoinTarget<T1> (
69 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
70 target2 = new JoinTarget<T2> (
71 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
72 target3 = new JoinTarget<T3>(
73 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
75 outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
77 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
80 public int BatchSize { get; private set; }
82 public ITargetBlock<T1> Target1 {
83 get { return target1; }
86 public ITargetBlock<T2> Target2 {
87 get { return target2; }
90 public ITargetBlock<T3> Target3 {
91 get { return target3; }
96 int current = Interlocked.Increment (ref batchCount);
98 if (current % BatchSize != 0)
101 Interlocked.Add (ref batchCount, -current);
103 MakeBatch (BatchSize);
106 void MakeBatch (int batchSize)
108 var list1 = new List<T1> ();
109 var list2 = new List<T2> ();
110 var list3 = new List<T3> ();
112 // lock is necessary here to make sure items are in the correct order
115 batchLock.Enter (ref taken);
120 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
126 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
132 while (i < batchSize && target3.Buffer.TryTake (out item3)) {
138 throw new InvalidOperationException ("Unexpected count of items.");
144 var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
147 var target = targets.Current;
149 outgoing.AddData (batch);
151 target.OfferMessage (headers.Increment (), batch, this, false);
153 if (!outgoing.IsEmpty && targets.Current != null)
154 outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
157 public Task Completion
159 get { return completionHelper.Completion; }
162 public void Complete ()
164 outgoing.Complete ();
167 void IDataflowBlock.Fault (Exception exception)
169 completionHelper.RequestFault (exception);
172 Tuple<IList<T1>, IList<T2>, IList<T3>>
173 ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
174 DataflowMessageHeader messageHeader,
175 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
176 out bool messageConsumed)
178 return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
181 public IDisposable LinkTo (
182 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
185 var result = targets.AddTarget (target, unlinkAfterOne);
186 outgoing.ProcessForTarget (target, this, false, ref headers);
190 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
191 DataflowMessageHeader messageHeader,
192 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
194 vault.ReleaseReservation (messageHeader, target);
197 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
198 DataflowMessageHeader messageHeader,
199 ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
201 return vault.ReserveMessage (messageHeader, target);
204 public bool TryReceive (
205 Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
206 out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
208 return outgoing.TryReceive (filter, out item);
211 public bool TryReceiveAll (
212 out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
214 return outgoing.TryReceiveAll (out items);
217 public override string ToString ()
219 return NameHelper.GetName (this, options);