3 // Copyright (c) 2011 Jérémie "garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
29 using System.Collections.Concurrent;
31 namespace System.Threading.Tasks.Dataflow
33 public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>, ISourceBlock<Tuple<T1, T2>>, IDataflowBlock
35 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
37 CompletionHelper compHelper = CompletionHelper.GetNew ();
38 GroupingDataflowBlockOptions dataflowBlockOptions;
39 TargetBuffer<Tuple<T1, T2>> targets = new TargetBuffer<Tuple<T1, T2>> ();
40 MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
41 MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
43 JoinTarget<T1> target1;
44 JoinTarget<T2> target2;
46 DataflowMessageHeader headers;
48 public JoinBlock () : this (defaultOptions)
53 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
55 if (dataflowBlockOptions == null)
56 throw new ArgumentNullException ("dataflowBlockOptions");
58 this.dataflowBlockOptions = dataflowBlockOptions;
59 this.target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, new BlockingCollection<T1> (), compHelper);
60 this.target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, new BlockingCollection<T2> (), compHelper);
61 this.outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
64 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
66 var result = targets.AddTarget (target, unlinkAfterOne);
67 outgoing.ProcessForTarget (target, this, false, ref headers);
71 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
73 return outgoing.TryReceive (filter, out item);
76 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
78 return outgoing.TryReceiveAll (out items);
81 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
83 return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
86 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
88 vault.ReleaseReservation (messageHeader, target);
91 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
93 return vault.ReserveMessage (messageHeader, target);
96 public void Complete ()
101 public void Fault (Exception ex)
103 compHelper.Fault (ex);
106 public Task Completion {
108 return compHelper.Completion;
112 void SignalArrivalTarget1 ()
115 if (target2.Buffer.TryTake (out val2)) {
116 T1 val1 = target1.Buffer.Take ();
117 TriggerMessage (val1, val2);
121 void SignalArrivalTarget2 ()
124 if (target1.Buffer.TryTake (out val1)) {
125 T2 val2 = target2.Buffer.Take ();
126 TriggerMessage (val1, val2);
130 void TriggerMessage (T1 val1, T2 val2)
132 Tuple<T1, T2> tuple = Tuple.Create (val1, val2);
133 ITargetBlock<Tuple<T1, T2>> target = targets.Current;
135 if (target == null) {
136 outgoing.AddData (tuple);
138 target.OfferMessage (headers.Increment (),
144 if (!outgoing.IsEmpty && (target = targets.Current) != null)
145 outgoing.ProcessForTarget (target, this, false, ref headers);
148 class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
150 JoinBlock<T1, T2> joinBlock;
151 BlockingCollection<TTarget> buffer;
154 public JoinTarget (JoinBlock<T1, T2> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
155 : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
157 this.joinBlock = joinBlock;
158 this.buffer = buffer;
159 this.signal = signal;
162 protected override void EnsureProcessing ()
167 public BlockingCollection<TTarget> Buffer {
173 DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
174 TTarget messageValue,
175 ISourceBlock<TTarget> source,
176 bool consumeToAccept)
178 return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
181 void IDataflowBlock.Complete ()
186 Task IDataflowBlock.Completion {
188 return joinBlock.Completion;
192 void IDataflowBlock.Fault (Exception e)
198 public ITargetBlock<T1> Target1 {
204 public ITargetBlock<T2> Target2 {