3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
26 namespace System.Threading.Tasks.Dataflow
28 public sealed class JoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<T1, T2, T3>>
30 readonly CompletionHelper compHelper;
31 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
32 readonly OutgoingQueue<Tuple<T1, T2, T3>> outgoing;
34 readonly JoinTarget<T1> target1;
35 readonly JoinTarget<T2> target2;
36 readonly JoinTarget<T3> target3;
38 SpinLock targetLock = new SpinLock (false);
39 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
46 public JoinBlock () : this (GroupingDataflowBlockOptions.Default)
50 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
52 if (dataflowBlockOptions == null)
53 throw new ArgumentNullException ("dataflowBlockOptions");
55 this.dataflowBlockOptions = dataflowBlockOptions;
56 this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
58 target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
59 () => outgoing.IsCompleted, dataflowBlockOptions,
60 dataflowBlockOptions.Greedy, TryAdd1);
61 target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
62 () => outgoing.IsCompleted, dataflowBlockOptions,
63 dataflowBlockOptions.Greedy, TryAdd2);
64 target3 = new JoinTarget<T3> (this, SignalArrivalTarget, compHelper,
65 () => outgoing.IsCompleted, dataflowBlockOptions,
66 dataflowBlockOptions.Greedy, TryAdd3);
67 outgoing = new OutgoingQueue<Tuple<T1, T2, T3>> (
69 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
70 || target3.Buffer.IsCompleted,
73 target1.DecreaseCount ();
74 target2.DecreaseCount ();
75 target3.DecreaseCount ();
76 }, dataflowBlockOptions);
79 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2, T3>> target, DataflowLinkOptions linkOptions)
81 return outgoing.AddTarget (target, linkOptions);
84 public bool TryReceive (Predicate<Tuple<T1, T2, T3>> filter, out Tuple<T1, T2, T3> item)
86 return outgoing.TryReceive (filter, out item);
89 public bool TryReceiveAll (out IList<Tuple<T1, T2, T3>> items)
91 return outgoing.TryReceiveAll (out items);
94 Tuple<T1, T2, T3> ISourceBlock<Tuple<T1, T2, T3>>.ConsumeMessage (
95 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target,
96 out bool messageConsumed)
98 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
101 void ISourceBlock<Tuple<T1, T2, T3>>.ReleaseReservation (
102 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
104 outgoing.ReleaseReservation (messageHeader, target);
107 bool ISourceBlock<Tuple<T1, T2, T3>>.ReserveMessage (
108 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
110 return outgoing.ReserveMessage (messageHeader, target);
113 public void Complete ()
118 outgoing.Complete ();
121 void IDataflowBlock.Fault (Exception exception)
123 compHelper.RequestFault (exception);
126 public Task Completion {
127 get { return compHelper.Completion; }
131 /// Returns whether a new item can be accepted by the first target,
132 /// and increments a counter if it can.
136 return dataflowBlockOptions.MaxNumberOfGroups == -1
137 || Interlocked.Increment (ref target1Count)
138 <= dataflowBlockOptions.MaxNumberOfGroups;
142 /// Returns whether a new item can be accepted by the second target,
143 /// and increments a counter if it can.
147 return dataflowBlockOptions.MaxNumberOfGroups == -1
148 || Interlocked.Increment (ref target2Count)
149 <= dataflowBlockOptions.MaxNumberOfGroups;
153 /// Returns whether a new item can be accepted by the third target,
154 /// and increments a counter if it can.
158 return dataflowBlockOptions.MaxNumberOfGroups == -1
159 || Interlocked.Increment (ref target3Count)
160 <= dataflowBlockOptions.MaxNumberOfGroups;
164 /// Decides whether to create a new tuple or not.
166 void SignalArrivalTarget ()
168 if (dataflowBlockOptions.Greedy) {
175 targetLock.Enter (ref taken);
177 if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0
178 || target3.Buffer.Count == 0)
181 value1 = target1.Buffer.Take ();
182 value2 = target2.Buffer.Take ();
183 value3 = target3.Buffer.Take ();
189 TriggerMessage (value1, value2, value3);
191 if (ShouldProcesNonGreedy ())
192 EnsureNonGreedyProcessing ();
197 /// Returns whether non-greedy creation of a tuple should be started.
199 bool ShouldProcesNonGreedy ()
201 return target1.PostponedMessagesCount >= 1
202 && target2.PostponedMessagesCount >= 1
203 && target3.PostponedMessagesCount >= 1
204 && (dataflowBlockOptions.BoundedCapacity == -1
205 || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
209 /// Starts non-greedy creation of tuples, if one doesn't already run.
211 void EnsureNonGreedyProcessing ()
213 if (nonGreedyProcessing.TrySet())
214 Task.Factory.StartNew (NonGreedyProcess,
215 dataflowBlockOptions.CancellationToken,
216 TaskCreationOptions.PreferFairness,
217 dataflowBlockOptions.TaskScheduler);
221 /// Creates tuples in non-greedy mode,
222 /// making sure the whole tuple is available by using reservations.
224 void NonGreedyProcess ()
226 while (ShouldProcesNonGreedy ()) {
227 var reservation1 = target1.ReserveMessage ();
229 if (reservation1 == null)
232 var reservation2 = target2.ReserveMessage ();
233 if (reservation2 == null) {
234 target1.RelaseReservation (reservation1);
238 var reservation3 = target3.ReserveMessage ();
239 if (reservation3 == null) {
240 target1.RelaseReservation (reservation1);
241 target2.RelaseReservation (reservation2);
245 var value1 = target1.ConsumeReserved (reservation1);
246 var value2 = target2.ConsumeReserved (reservation2);
247 var value3 = target3.ConsumeReserved (reservation3);
249 TriggerMessage (value1, value2, value3);
252 nonGreedyProcessing.Value = false;
254 if (ShouldProcesNonGreedy ())
255 EnsureNonGreedyProcessing ();
259 /// Creates a tuple from the given values and adds the result to the output queue.
261 void TriggerMessage (T1 val1, T2 val2, T3 val3)
263 outgoing.AddData (Tuple.Create (val1, val2, val3));
265 if (dataflowBlockOptions.MaxNumberOfGroups != -1
266 && Interlocked.Increment (ref numberOfGroups)
267 >= dataflowBlockOptions.MaxNumberOfGroups)
271 public ITargetBlock<T1> Target1 {
272 get { return target1; }
275 public ITargetBlock<T2> Target2 {
276 get { return target2; }
279 public ITargetBlock<T3> Target3 {
280 get { return target3; }
283 public int OutputCount {
284 get { return outgoing.Count; }
287 public override string ToString ()
289 return NameHelper.GetName (this, dataflowBlockOptions);