Fixed deadlock in JoinBlock by using a lock
authorPetr Onderka <gsvick@gmail.com>
Mon, 25 Jun 2012 18:09:14 +0000 (20:09 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 21:36:19 +0000 (23:36 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs

index 67963e9760919e4967908bf0a192ae3f6de0f0ba..be740acdac80c5951558b168ee405096e0b3817a 100644 (file)
@@ -42,6 +42,8 @@ namespace System.Threading.Tasks.Dataflow
                readonly JoinTarget<T1> target1;
                readonly JoinTarget<T2> target2;
 
+               SpinLock targetLock = new SpinLock(false);
+
                DataflowMessageHeader headers;
 
                public JoinBlock () : this (defaultOptions)
@@ -55,8 +57,8 @@ namespace System.Threading.Tasks.Dataflow
                                throw new ArgumentNullException ("dataflowBlockOptions");
 
                        this.dataflowBlockOptions = dataflowBlockOptions;
-                       target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, compHelper, () => outgoing.IsCompleted);
-                       target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, compHelper, () => outgoing.IsCompleted);
+                       target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
+                       target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
                        outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
                }
 
@@ -108,24 +110,30 @@ namespace System.Threading.Tasks.Dataflow
                        }
                }
 
-               void SignalArrivalTarget1 ()
+               // TODO: see if we can find a lockless implementation
+               void SignalArrivalTargetImpl()
                {
-                       T2 val2;
-                       if (target2.Buffer.TryTake (out val2)) {
-                               T1 val1 = target1.Buffer.Take ();
-                               TriggerMessage (val1, val2);
-                       }
-               }
+                       bool taken = false;
+                       T1 value1;
+                       T2 value2;
 
-               void SignalArrivalTarget2 ()
-               {
-                       T1 val1;
-                       if (target1.Buffer.TryTake (out val1)) {
-                               T2 val2 = target2.Buffer.Take ();
-                               TriggerMessage (val1, val2);
+                       try {
+                               targetLock.Enter (ref taken);
+
+                               if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
+                                       return;
+
+                               value1 = target1.Buffer.Take ();
+                               value2 = target2.Buffer.Take ();
+                       } finally {
+                               if (taken)
+                                       targetLock.Exit ();
                        }
+
+                       TriggerMessage (value1, value2);
                }
 
+
                void TriggerMessage (T1 val1, T2 val2)
                {
                        Tuple<T1, T2> tuple = Tuple.Create (val1, val2);