// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-#if NET_4_0 || MOBILE
+#if NET_4_0
using System.Collections.Generic;
+using System.Collections.Concurrent;
namespace System.Threading
{
public class CancellationTokenSource : IDisposable
{
bool canceled;
- bool processed;
bool disposed;
int currId = int.MinValue;
+ ConcurrentDictionary<CancellationTokenRegistration, Action> callbacks;
+ CancellationTokenRegistration[] linkedTokens;
- Dictionary<CancellationTokenRegistration, Action> callbacks;
-
ManualResetEvent handle;
- readonly object syncRoot = new object ();
internal static readonly CancellationTokenSource NoneSource = new CancellationTokenSource ();
internal static readonly CancellationTokenSource CanceledSource = new CancellationTokenSource ();
static CancellationTokenSource ()
{
- CanceledSource.processed = true;
CanceledSource.canceled = true;
#if NET_4_5
public CancellationTokenSource ()
{
- callbacks = new Dictionary<CancellationTokenRegistration, Action> ();
+ callbacks = new ConcurrentDictionary<CancellationTokenRegistration, Action> ();
handle = new ManualResetEvent (false);
}
{
CheckDisposed ();
+ if (canceled)
+ return;
+
+ Thread.MemoryBarrier ();
canceled = true;
+
handle.Set ();
+ if (linkedTokens != null)
+ UnregisterLinkedTokens ();
List<Exception> exceptions = null;
- lock (syncRoot) {
- try {
- foreach (var item in callbacks) {
- if (throwOnFirstException) {
- item.Value ();
- } else {
- try {
- item.Value ();
- } catch (Exception e) {
- if (exceptions == null)
- exceptions = new List<Exception> ();
-
- exceptions.Add (e);
- }
+ try {
+ Action cb;
+ for (int id = currId; id != int.MinValue; id--) {
+ if (!callbacks.TryRemove (new CancellationTokenRegistration (id, this), out cb))
+ continue;
+ if (cb == null)
+ continue;
+
+ if (throwOnFirstException) {
+ cb ();
+ } else {
+ try {
+ cb ();
+ } catch (Exception e) {
+ if (exceptions == null)
+ exceptions = new List<Exception> ();
+
+ exceptions.Add (e);
}
}
- } finally {
- callbacks.Clear ();
}
+ } finally {
+ callbacks.Clear ();
}
-
- Thread.MemoryBarrier ();
- processed = true;
-
+
if (exceptions != null)
throw new AggregateException (exceptions);
}
+ /* This is the callback registered on linked tokens
+ * so that they don't throw an ODE if the callback
+ * is called concurrently with a Dispose
+ */
+ void SafeLinkedCancel ()
+ {
+ try {
+ Cancel ();
+ } catch (ObjectDisposedException) {}
+ }
+
#if NET_4_5
public void CancelAfter (TimeSpan delay)
{
throw new ArgumentException ("Empty tokens array");
CancellationTokenSource src = new CancellationTokenSource ();
- Action action = src.Cancel;
+ Action action = src.SafeLinkedCancel;
+ var registrations = new List<CancellationTokenRegistration> (tokens.Length);
foreach (CancellationToken token in tokens) {
if (token.CanBeCanceled)
- token.Register (action);
+ registrations.Add (token.Register (action));
}
+ src.linkedTokens = registrations.ToArray ();
return src;
}
void Dispose (bool disposing)
{
if (disposing && !disposed) {
+ Thread.MemoryBarrier ();
disposed = true;
- callbacks = null;
+ if (!canceled) {
+ Thread.MemoryBarrier ();
+ UnregisterLinkedTokens ();
+ callbacks = null;
+ }
#if NET_4_5
if (timer != null)
timer.Dispose ();
handle.Dispose ();
}
}
+
+ void UnregisterLinkedTokens ()
+ {
+ var registrations = Interlocked.Exchange (ref linkedTokens, null);
+ if (registrations == null)
+ return;
+ foreach (var linked in registrations)
+ linked.Dispose ();
+ }
internal CancellationTokenRegistration Register (Action callback, bool useSynchronizationContext)
{
var tokenReg = new CancellationTokenRegistration (Interlocked.Increment (ref currId), this);
- if (canceled) {
+ /* If the source is already canceled we execute the callback immediately
+ * if not, we try to add it to the queue and if it is currently being processed
+ * we try to execute it back ourselves to be sure the callback is ran
+ */
+ if (canceled)
callback ();
- } else {
- bool temp = false;
- lock (syncRoot) {
- if (!(temp = canceled))
- callbacks.Add (tokenReg, callback);
- }
- if (temp)
+ else {
+ callbacks.TryAdd (tokenReg, callback);
+ if (canceled && callbacks.TryRemove (tokenReg, out callback))
callback ();
}
return tokenReg;
}
-
- internal void RemoveCallback (CancellationTokenRegistration tokenReg)
+
+ internal void RemoveCallback (CancellationTokenRegistration reg)
{
- if (!canceled) {
- lock (syncRoot) {
- if (!canceled) {
- callbacks.Remove (tokenReg);
- return;
- }
- }
- }
-
- SpinWait sw = new SpinWait ();
- while (!processed)
- sw.SpinOnce ();
-
+ // Ignore call if the source has been disposed
+ if (disposed)
+ return;
+ Action dummy;
+ var cbs = callbacks;
+ if (cbs != null)
+ cbs.TryRemove (reg, out dummy);
}
}
}