3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
26 namespace System.Threading.Tasks.Dataflow
28 public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>
30 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
32 readonly CompletionHelper compHelper;
33 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
34 readonly MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
36 readonly JoinTarget<T1> target1;
37 readonly JoinTarget<T2> target2;
39 SpinLock targetLock = new SpinLock(false);
40 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
42 public JoinBlock () : this (defaultOptions)
46 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
48 if (dataflowBlockOptions == null)
49 throw new ArgumentNullException ("dataflowBlockOptions");
51 this.dataflowBlockOptions = dataflowBlockOptions;
52 compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
53 target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
54 () => outgoing.IsCompleted, dataflowBlockOptions,
55 dataflowBlockOptions.Greedy);
56 target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
57 () => outgoing.IsCompleted, dataflowBlockOptions,
58 dataflowBlockOptions.Greedy);
59 outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (this, compHelper,
60 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
63 target1.DecreaseCount ();
64 target2.DecreaseCount ();
65 }, dataflowBlockOptions);
68 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
70 return outgoing.AddTarget (target, linkOptions);
73 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
75 return outgoing.TryReceive (filter, out item);
78 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
80 return outgoing.TryReceiveAll (out items);
83 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
85 return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
88 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
90 outgoing.ReleaseReservation (messageHeader, target);
93 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
95 return outgoing.ReserveMessage (messageHeader, target);
98 public void Complete ()
102 outgoing.Complete ();
105 public void Fault (Exception ex)
107 compHelper.RequestFault (ex);
110 public Task Completion {
112 return compHelper.Completion;
116 void SignalArrivalTargetImpl()
118 if (dataflowBlockOptions.Greedy) {
124 targetLock.Enter (ref taken);
126 if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
129 value1 = target1.Buffer.Take ();
130 value2 = target2.Buffer.Take ();
136 TriggerMessage (value1, value2);
138 if (ShouldProcessNonGreedy ())
139 EnsureNonGreedyProcessing ();
143 bool ShouldProcessNonGreedy ()
145 return target1.PostponedMessagesCount >= 1
146 && target2.PostponedMessagesCount >= 1
147 && (dataflowBlockOptions.BoundedCapacity == -1
148 || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
151 void EnsureNonGreedyProcessing ()
153 if (nonGreedyProcessing.TrySet ())
154 Task.Factory.StartNew (NonGreedyProcess,
155 dataflowBlockOptions.CancellationToken,
156 TaskCreationOptions.PreferFairness,
157 dataflowBlockOptions.TaskScheduler);
160 void NonGreedyProcess()
162 while (ShouldProcessNonGreedy ()) {
163 var reservation1 = target1.ReserveMessage ();
165 if (reservation1 == null)
168 var reservation2 = target2.ReserveMessage ();
169 if (reservation2 == null) {
170 target1.RelaseReservation (reservation1);
174 var value1 = target1.ConsumeReserved (reservation1);
175 var value2 = target2.ConsumeReserved (reservation2);
177 TriggerMessage (value1, value2);
180 nonGreedyProcessing.Value = false;
182 if (ShouldProcessNonGreedy ())
183 EnsureNonGreedyProcessing ();
187 void TriggerMessage (T1 val1, T2 val2)
189 outgoing.AddData (Tuple.Create (val1, val2));
192 public ITargetBlock<T1> Target1 {
198 public ITargetBlock<T2> Target2 {
204 public override string ToString ()
206 return NameHelper.GetName (this, dataflowBlockOptions);