Rework CancellationTokenSource to remove locking. Fix #4173.
authorJeremie Laval <jeremie.laval@gmail.com>
Tue, 10 Apr 2012 11:35:18 +0000 (12:35 +0100)
committerJeremie Laval <jeremie.laval@gmail.com>
Tue, 10 Apr 2012 11:39:03 +0000 (12:39 +0100)
Issue reported by MartinG, unit test code based on his repro.

mcs/class/corlib/System.Threading/CancellationTokenRegistration.cs
mcs/class/corlib/System.Threading/CancellationTokenSource.cs
mcs/class/corlib/Test/System.Threading/CancellationTokenSourceTest.cs

index 96f298de0d0c5f6c19b6cf050565dd9b4b449e52..f89329067c36cb69f94660d204d84e831e9ad02a 100644 (file)
@@ -40,14 +40,12 @@ namespace System.Threading
                        this.id = id;
                        this.source = source;
                }
-               
+
                #region IDisposable implementation
                public void Dispose ()
                {
-                       // Remove the corresponding callback from source
                        source.RemoveCallback (this);
                }
-
                #endregion
 
                #region IEquatable<CancellationTokenRegistration> implementation
@@ -76,7 +74,6 @@ namespace System.Threading
                {
                        return (obj is CancellationTokenRegistration) ? Equals ((CancellationTokenRegistration)obj) : false;
                }
-
        }
 }
 #endif
index 76503e7dc5f3abd780145e24c13c6dff7cc0ecb0..4c5b2b2196ca47ae937828aa4ca1193b5766d63b 100644 (file)
@@ -28,6 +28,7 @@
 
 #if NET_4_0 || MOBILE
 using System.Collections.Generic;
+using System.Collections.Concurrent;
 
 namespace System.Threading
 {
@@ -37,15 +38,12 @@ namespace System.Threading
        public class CancellationTokenSource : IDisposable
        {
                bool canceled;
-               bool processed;
                bool disposed;
                
                int currId = int.MinValue;
+               ConcurrentDictionary<CancellationTokenRegistration, Action> callbacks;
 
-               Dictionary<CancellationTokenRegistration, Action> callbacks;
-               
                ManualResetEvent handle;
-               readonly object syncRoot = new object ();
                
                internal static readonly CancellationTokenSource NoneSource = new CancellationTokenSource ();
                internal static readonly CancellationTokenSource CanceledSource = new CancellationTokenSource ();
@@ -57,7 +55,6 @@ namespace System.Threading
 
                static CancellationTokenSource ()
                {
-                       CanceledSource.processed = true;
                        CanceledSource.canceled = true;
 
 #if NET_4_5
@@ -70,7 +67,7 @@ namespace System.Threading
 
                public CancellationTokenSource ()
                {
-                       callbacks = new Dictionary<CancellationTokenRegistration, Action> ();
+                       callbacks = new ConcurrentDictionary<CancellationTokenRegistration, Action> ();
                        handle = new ManualResetEvent (false);
                }
 
@@ -126,30 +123,31 @@ namespace System.Threading
                        
                        List<Exception> exceptions = null;
                        
-                       lock (syncRoot) {
-                               try {
-                                       foreach (var item in callbacks) {
-                                               if (throwOnFirstException) {
-                                                       item.Value ();
-                                               } else {
-                                                       try {
-                                                               item.Value ();
-                                                       } catch (Exception e) {
-                                                               if (exceptions == null)
-                                                                       exceptions = new List<Exception> ();
-
-                                                               exceptions.Add (e);
-                                                       }
+                       try {
+                               Action cb;
+                               for (int id = int.MinValue + 1; id <= currId; id++) {
+                                       if (!callbacks.TryRemove (new CancellationTokenRegistration (id, this), out cb))
+                                               continue;
+                                       if (cb == null)
+                                               continue;
+
+                                       if (throwOnFirstException) {
+                                               cb ();
+                                       } else {
+                                               try {
+                                                       cb ();
+                                               } catch (Exception e) {
+                                                       if (exceptions == null)
+                                                               exceptions = new List<Exception> ();
+
+                                                       exceptions.Add (e);
                                                }
                                        }
-                               } finally {
-                                       callbacks.Clear ();
                                }
+                       } finally {
+                               callbacks.Clear ();
                        }
-                       
-                       Thread.MemoryBarrier ();
-                       processed = true;
-                       
+
                        if (exceptions != null)
                                throw new AggregateException (exceptions);
                }
@@ -248,36 +246,25 @@ namespace System.Threading
 
                        var tokenReg = new CancellationTokenRegistration (Interlocked.Increment (ref currId), this);
 
-                       if (canceled) {
+                       /* If the source is already canceled we execute the callback immediately
+                        * if not, we try to add it to the queue and if it is currently being processed
+                        * we try to execute it back ourselves to be sure the callback is ran
+                        */
+                       if (canceled)
                                callback ();
-                       } else {
-                               bool temp = false;
-                               lock (syncRoot) {
-                                       if (!(temp = canceled))
-                                               callbacks.Add (tokenReg, callback);
-                               }
-                               if (temp)
+                       else {
+                               callbacks.TryAdd (tokenReg, callback);
+                               if (canceled && callbacks.TryRemove (tokenReg, out callback))
                                        callback ();
                        }
                        
                        return tokenReg;
                }
-               
-               internal void RemoveCallback (CancellationTokenRegistration tokenReg)
+
+               internal void RemoveCallback (CancellationTokenRegistration reg)
                {
-                       if (!canceled) {
-                               lock (syncRoot) {
-                                       if (!canceled) {
-                                               callbacks.Remove (tokenReg);
-                                               return;
-                                       }
-                               }
-                       }
-                       
-                       SpinWait sw = new SpinWait ();
-                       while (!processed)
-                               sw.SpinOnce ();
-                       
+                       Action dummy;
+                       callbacks.TryRemove (reg, out dummy);
                }
        }
 }
index ea2b86e25888c52ce93d25740f235a4f9cc6b1c2..e683d33bf1c781d4401c64d0f430aa5e2f641dbb 100644 (file)
@@ -321,6 +321,25 @@ namespace MonoTests.System.Threading
                        Assert.IsTrue (t.Wait (1000), "#3");
                        Assert.AreEqual (12, called, "#4");
                }
+
+               [Test]
+               public void ReEntrantRegistrationTest ()
+               {
+                       bool unregister = false;
+                       bool register = false;
+                       var source = new CancellationTokenSource ();
+                       var token = source.Token;
+
+                       var reg = new CancellationTokenRegistration ();
+                       Console.WriteLine ("Test1");
+                       token.Register (() => reg.Dispose ());
+                       reg = token.Register (() => unregister = true);
+                       token.Register (() => { Console.WriteLine ("Gnyah"); token.Register (() => register = true); });
+                       source.Cancel ();
+
+                       Assert.IsFalse (unregister);
+                       Assert.IsTrue (register);
+               }
        }
 }