readonly JoinTarget<T1> target1;
readonly JoinTarget<T2> target2;
+ SpinLock targetLock = new SpinLock(false);
+
DataflowMessageHeader headers;
public JoinBlock () : this (defaultOptions)
throw new ArgumentNullException ("dataflowBlockOptions");
this.dataflowBlockOptions = dataflowBlockOptions;
- target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, compHelper, () => outgoing.IsCompleted);
- target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, compHelper, () => outgoing.IsCompleted);
+ target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
+ target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
}
}
}
- void SignalArrivalTarget1 ()
+ // TODO: see if we can find a lockless implementation
+ void SignalArrivalTargetImpl()
{
- T2 val2;
- if (target2.Buffer.TryTake (out val2)) {
- T1 val1 = target1.Buffer.Take ();
- TriggerMessage (val1, val2);
- }
- }
+ bool taken = false;
+ T1 value1;
+ T2 value2;
- void SignalArrivalTarget2 ()
- {
- T1 val1;
- if (target1.Buffer.TryTake (out val1)) {
- T2 val2 = target2.Buffer.Take ();
- TriggerMessage (val1, val2);
+ try {
+ targetLock.Enter (ref taken);
+
+ if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
+ return;
+
+ value1 = target1.Buffer.Take ();
+ value2 = target2.Buffer.Take ();
+ } finally {
+ if (taken)
+ targetLock.Exit ();
}
+
+ TriggerMessage (value1, value2);
}
+
void TriggerMessage (T1 val1, T2 val2)
{
Tuple<T1, T2> tuple = Tuple.Create (val1, val2);