{
public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>
{
- static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
-
readonly CompletionHelper compHelper;
readonly GroupingDataflowBlockOptions dataflowBlockOptions;
- readonly MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
+ readonly OutgoingQueue<Tuple<T1, T2>> outgoing;
readonly JoinTarget<T1> target1;
readonly JoinTarget<T2> target2;
long target2Count;
long numberOfGroups;
- public JoinBlock () : this (defaultOptions)
+ public JoinBlock () : this (GroupingDataflowBlockOptions.Default)
{
}
this.dataflowBlockOptions = dataflowBlockOptions;
compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
- target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
+ target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd1);
- target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
+ target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd2);
- outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (this, compHelper,
+ outgoing = new OutgoingQueue<Tuple<T1, T2>> (this, compHelper,
() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
_ =>
{
return outgoing.TryReceiveAll (out items);
}
- public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
+ Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage (
+ DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target,
+ out bool messageConsumed)
{
return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
}
- public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
+ void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation (
+ DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
outgoing.ReleaseReservation (messageHeader, target);
}
- public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
+ bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage (
+ DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
return outgoing.ReserveMessage (messageHeader, target);
}
outgoing.Complete ();
}
- public void Fault (Exception exception)
+ void IDataflowBlock.Fault (Exception exception)
{
compHelper.RequestFault (exception);
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the first target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd1 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
<= dataflowBlockOptions.MaxNumberOfGroups;
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the second target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd2 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
<= dataflowBlockOptions.MaxNumberOfGroups;
}
- void SignalArrivalTargetImpl()
+ /// <summary>
+ /// Decides whether to create a new tuple or not.
+ /// </summary>
+ void SignalArrivalTarget ()
{
if (dataflowBlockOptions.Greedy) {
bool taken = false;
}
}
+ /// <summary>
+ /// Returns whether non-greedy creation of a tuple should be started.
+ /// </summary>
bool ShouldProcessNonGreedy ()
{
return target1.PostponedMessagesCount >= 1
|| outgoing.Count < dataflowBlockOptions.BoundedCapacity);
}
+ /// <summary>
+ /// Starts non-greedy creation of tuples, if one doesn't already run.
+ /// </summary>
void EnsureNonGreedyProcessing ()
{
if (nonGreedyProcessing.TrySet ())
dataflowBlockOptions.TaskScheduler);
}
+ /// <summary>
+ /// Creates tuples in non-greedy mode,
+ /// making sure the whole tuple is available by using reservations.
+ /// </summary>
void NonGreedyProcess()
{
while (ShouldProcessNonGreedy ()) {
}
+ /// <summary>
+ /// Creates a tuple from the given values and adds the result to the output queue.
+ /// </summary>
void TriggerMessage (T1 val1, T2 val2)
{
outgoing.AddData (Tuple.Create (val1, val2));
}
public ITargetBlock<T1> Target1 {
- get {
- return target1;
- }
+ get { return target1; }
}
public ITargetBlock<T2> Target2 {
- get {
- return target2;
- }
+ get { return target2; }
+ }
+
+ public int OutputCount {
+ get { return outgoing.Count; }
}
public override string ToString ()