3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
26 namespace System.Threading.Tasks.Dataflow
28 public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>
30 readonly CompletionHelper compHelper;
31 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
32 readonly OutgoingQueue<Tuple<T1, T2>> outgoing;
34 readonly JoinTarget<T1> target1;
35 readonly JoinTarget<T2> target2;
37 SpinLock targetLock = new SpinLock(false);
38 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
44 public JoinBlock () : this (GroupingDataflowBlockOptions.Default)
48 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
50 if (dataflowBlockOptions == null)
51 throw new ArgumentNullException ("dataflowBlockOptions");
53 this.dataflowBlockOptions = dataflowBlockOptions;
54 compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
55 target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
56 () => outgoing.IsCompleted, dataflowBlockOptions,
57 dataflowBlockOptions.Greedy, TryAdd1);
58 target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
59 () => outgoing.IsCompleted, dataflowBlockOptions,
60 dataflowBlockOptions.Greedy, TryAdd2);
61 outgoing = new OutgoingQueue<Tuple<T1, T2>> (this, compHelper,
62 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
65 target1.DecreaseCount ();
66 target2.DecreaseCount ();
67 }, dataflowBlockOptions);
70 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
72 return outgoing.AddTarget (target, linkOptions);
75 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
77 return outgoing.TryReceive (filter, out item);
80 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
82 return outgoing.TryReceiveAll (out items);
85 Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage (
86 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target,
87 out bool messageConsumed)
89 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
92 void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation (
93 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
95 outgoing.ReleaseReservation (messageHeader, target);
98 bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage (
99 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
101 return outgoing.ReserveMessage (messageHeader, target);
104 public void Complete ()
108 outgoing.Complete ();
111 void IDataflowBlock.Fault (Exception exception)
113 compHelper.RequestFault (exception);
116 public Task Completion {
117 get { return compHelper.Completion; }
121 /// Returns whether a new item can be accepted by the first target,
122 /// and increments a counter if it can.
126 return dataflowBlockOptions.MaxNumberOfGroups == -1
127 || Interlocked.Increment (ref target1Count)
128 <= dataflowBlockOptions.MaxNumberOfGroups;
132 /// Returns whether a new item can be accepted by the second target,
133 /// and increments a counter if it can.
137 return dataflowBlockOptions.MaxNumberOfGroups == -1
138 || Interlocked.Increment (ref target2Count)
139 <= dataflowBlockOptions.MaxNumberOfGroups;
143 /// Decides whether to create a new tuple or not.
145 void SignalArrivalTarget ()
147 if (dataflowBlockOptions.Greedy) {
153 targetLock.Enter (ref taken);
155 if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
158 value1 = target1.Buffer.Take ();
159 value2 = target2.Buffer.Take ();
165 TriggerMessage (value1, value2);
167 if (ShouldProcessNonGreedy ())
168 EnsureNonGreedyProcessing ();
173 /// Returns whether non-greedy creation of a tuple should be started.
175 bool ShouldProcessNonGreedy ()
177 return target1.PostponedMessagesCount >= 1
178 && target2.PostponedMessagesCount >= 1
179 && (dataflowBlockOptions.BoundedCapacity == -1
180 || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
184 /// Starts non-greedy creation of tuples, if one doesn't already run.
186 void EnsureNonGreedyProcessing ()
188 if (nonGreedyProcessing.TrySet ())
189 Task.Factory.StartNew (NonGreedyProcess,
190 dataflowBlockOptions.CancellationToken,
191 TaskCreationOptions.PreferFairness,
192 dataflowBlockOptions.TaskScheduler);
196 /// Creates tuples in non-greedy mode,
197 /// making sure the whole tuple is available by using reservations.
199 void NonGreedyProcess()
201 while (ShouldProcessNonGreedy ()) {
202 var reservation1 = target1.ReserveMessage ();
204 if (reservation1 == null)
207 var reservation2 = target2.ReserveMessage ();
208 if (reservation2 == null) {
209 target1.RelaseReservation (reservation1);
213 var value1 = target1.ConsumeReserved (reservation1);
214 var value2 = target2.ConsumeReserved (reservation2);
216 TriggerMessage (value1, value2);
219 nonGreedyProcessing.Value = false;
221 if (ShouldProcessNonGreedy ())
222 EnsureNonGreedyProcessing ();
227 /// Creates a tuple from the given values and adds the result to the output queue.
229 void TriggerMessage (T1 val1, T2 val2)
231 outgoing.AddData (Tuple.Create (val1, val2));
233 if (dataflowBlockOptions.MaxNumberOfGroups != -1
234 && Interlocked.Increment (ref numberOfGroups)
235 >= dataflowBlockOptions.MaxNumberOfGroups)
239 public ITargetBlock<T1> Target1 {
240 get { return target1; }
243 public ITargetBlock<T2> Target2 {
244 get { return target2; }
247 public int OutputCount {
248 get { return outgoing.Count; }
251 public override string ToString ()
253 return NameHelper.GetName (this, dataflowBlockOptions);