#if NET_4_0 || MOBILE
using System.Collections.Generic;
+using System.Collections.Concurrent;
namespace System.Threading
{
public class CancellationTokenSource : IDisposable
{
bool canceled;
- bool processed;
bool disposed;
int currId = int.MinValue;
+ ConcurrentDictionary<CancellationTokenRegistration, Action> callbacks;
- Dictionary<CancellationTokenRegistration, Action> callbacks;
-
ManualResetEvent handle;
- readonly object syncRoot = new object ();
internal static readonly CancellationTokenSource NoneSource = new CancellationTokenSource ();
internal static readonly CancellationTokenSource CanceledSource = new CancellationTokenSource ();
static CancellationTokenSource ()
{
- CanceledSource.processed = true;
CanceledSource.canceled = true;
#if NET_4_5
public CancellationTokenSource ()
{
- callbacks = new Dictionary<CancellationTokenRegistration, Action> ();
+ callbacks = new ConcurrentDictionary<CancellationTokenRegistration, Action> ();
handle = new ManualResetEvent (false);
}
List<Exception> exceptions = null;
- lock (syncRoot) {
- try {
- foreach (var item in callbacks) {
- if (throwOnFirstException) {
- item.Value ();
- } else {
- try {
- item.Value ();
- } catch (Exception e) {
- if (exceptions == null)
- exceptions = new List<Exception> ();
-
- exceptions.Add (e);
- }
+ try {
+ Action cb;
+ for (int id = int.MinValue + 1; id <= currId; id++) {
+ if (!callbacks.TryRemove (new CancellationTokenRegistration (id, this), out cb))
+ continue;
+ if (cb == null)
+ continue;
+
+ if (throwOnFirstException) {
+ cb ();
+ } else {
+ try {
+ cb ();
+ } catch (Exception e) {
+ if (exceptions == null)
+ exceptions = new List<Exception> ();
+
+ exceptions.Add (e);
}
}
- } finally {
- callbacks.Clear ();
}
+ } finally {
+ callbacks.Clear ();
}
-
- Thread.MemoryBarrier ();
- processed = true;
-
+
if (exceptions != null)
throw new AggregateException (exceptions);
}
var tokenReg = new CancellationTokenRegistration (Interlocked.Increment (ref currId), this);
- if (canceled) {
+ /* If the source is already canceled we execute the callback immediately
+ * if not, we try to add it to the queue and if it is currently being processed
+ * we try to execute it back ourselves to be sure the callback is ran
+ */
+ if (canceled)
callback ();
- } else {
- bool temp = false;
- lock (syncRoot) {
- if (!(temp = canceled))
- callbacks.Add (tokenReg, callback);
- }
- if (temp)
+ else {
+ callbacks.TryAdd (tokenReg, callback);
+ if (canceled && callbacks.TryRemove (tokenReg, out callback))
callback ();
}
return tokenReg;
}
-
- internal void RemoveCallback (CancellationTokenRegistration tokenReg)
+
+ internal void RemoveCallback (CancellationTokenRegistration reg)
{
- if (!canceled) {
- lock (syncRoot) {
- if (!canceled) {
- callbacks.Remove (tokenReg);
- return;
- }
- }
- }
-
- SpinWait sw = new SpinWait ();
- while (!processed)
- sw.SpinOnce ();
-
+ Action dummy;
+ callbacks.TryRemove (reg, out dummy);
}
}
}
Assert.IsTrue (t.Wait (1000), "#3");
Assert.AreEqual (12, called, "#4");
}
+
+ [Test]
+ public void ReEntrantRegistrationTest ()
+ {
+ bool unregister = false;
+ bool register = false;
+ var source = new CancellationTokenSource ();
+ var token = source.Token;
+
+ var reg = new CancellationTokenRegistration ();
+ Console.WriteLine ("Test1");
+ token.Register (() => reg.Dispose ());
+ reg = token.Register (() => unregister = true);
+ token.Register (() => { Console.WriteLine ("Gnyah"); token.Register (() => register = true); });
+ source.Cancel ();
+
+ Assert.IsFalse (unregister);
+ Assert.IsTrue (register);
+ }
}
}