3 // Copyright (c) 2011 Jérémie "garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
30 namespace System.Threading.Tasks.Dataflow
32 public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>, ISourceBlock<Tuple<T1, T2>>, IDataflowBlock
34 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
36 CompletionHelper compHelper = CompletionHelper.GetNew ();
37 GroupingDataflowBlockOptions dataflowBlockOptions;
38 TargetBuffer<Tuple<T1, T2>> targets = new TargetBuffer<Tuple<T1, T2>> ();
39 MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
40 MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
42 readonly JoinTarget<T1> target1;
43 readonly JoinTarget<T2> target2;
45 SpinLock targetLock = new SpinLock(false);
47 DataflowMessageHeader headers;
49 public JoinBlock () : this (defaultOptions)
54 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
56 if (dataflowBlockOptions == null)
57 throw new ArgumentNullException ("dataflowBlockOptions");
59 this.dataflowBlockOptions = dataflowBlockOptions;
60 target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
61 target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
62 outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
65 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
67 var result = targets.AddTarget (target, unlinkAfterOne);
68 outgoing.ProcessForTarget (target, this, false, ref headers);
72 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
74 return outgoing.TryReceive (filter, out item);
77 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
79 return outgoing.TryReceiveAll (out items);
82 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
84 return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
87 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
89 vault.ReleaseReservation (messageHeader, target);
92 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
94 return vault.ReserveMessage (messageHeader, target);
97 public void Complete ()
102 public void Fault (Exception ex)
104 compHelper.Fault (ex);
107 public Task Completion {
109 return compHelper.Completion;
113 // TODO: see if we can find a lockless implementation
114 void SignalArrivalTargetImpl()
121 targetLock.Enter (ref taken);
123 if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
126 value1 = target1.Buffer.Take ();
127 value2 = target2.Buffer.Take ();
133 TriggerMessage (value1, value2);
137 void TriggerMessage (T1 val1, T2 val2)
139 Tuple<T1, T2> tuple = Tuple.Create (val1, val2);
140 ITargetBlock<Tuple<T1, T2>> target = targets.Current;
142 if (target == null) {
143 outgoing.AddData (tuple);
145 target.OfferMessage (headers.Increment (),
151 if (!outgoing.IsEmpty && (target = targets.Current) != null)
152 outgoing.ProcessForTarget (target, this, false, ref headers);
155 public ITargetBlock<T1> Target1 {
161 public ITargetBlock<T2> Target2 {
167 public override string ToString ()
169 return NameHelper.GetName (this, dataflowBlockOptions);