3 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28 using System.Threading;
29 using System.Collections.Concurrent;
31 namespace System.Threading.Tasks
33 [System.Diagnostics.DebuggerDisplay ("Id = {Id}, Status = {Status}, Method = {DisplayActionMethod}")]
34 // [System.Diagnostics.DebuggerTypeProxy ("System.Threading.Tasks.SystemThreadingTasks_TaskDebugView")]
35 public class Task : IDisposable, IAsyncResult
37 // With this attribute each thread has its own value so that it's correct for our Schedule code
38 // and for Parent property.
42 static Action<Task> childWorkAdder;
47 static TaskFactory defaultFactory = new TaskFactory ();
49 CountdownEvent childTasks = new CountdownEvent (1);
52 TaskCreationOptions taskCreationOptions;
54 TaskScheduler scheduler;
56 ManualResetEventSlim schedWait = new ManualResetEventSlim (false);
58 volatile AggregateException exception;
59 volatile bool exceptionObserved;
60 ConcurrentQueue<AggregateException> childExceptions;
64 Action<object> action;
67 AtomicBooleanValue executing;
69 ConcurrentQueue<EventHandler> completed;
71 CancellationToken token;
73 const TaskCreationOptions MaxTaskCreationOptions =
74 TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
76 public Task (Action action) : this (action, TaskCreationOptions.None)
81 public Task (Action action, TaskCreationOptions creationOptions) : this (action, CancellationToken.None, creationOptions)
86 public Task (Action action, CancellationToken cancellationToken) : this (action, cancellationToken, TaskCreationOptions.None)
91 public Task (Action action, CancellationToken cancellationToken, TaskCreationOptions creationOptions)
92 : this (null, null, cancellationToken, creationOptions, current)
95 throw new ArgumentNullException ("action");
96 if (creationOptions > MaxTaskCreationOptions || creationOptions < TaskCreationOptions.None)
97 throw new ArgumentOutOfRangeException ("creationOptions");
98 this.simpleAction = action;
101 public Task (Action<object> action, object state) : this (action, state, TaskCreationOptions.None)
105 public Task (Action<object> action, object state, TaskCreationOptions creationOptions)
106 : this (action, state, CancellationToken.None, creationOptions)
110 public Task (Action<object> action, object state, CancellationToken cancellationToken)
111 : this (action, state, cancellationToken, TaskCreationOptions.None)
115 public Task (Action<object> action, object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions)
116 : this (action, state, cancellationToken, creationOptions, current)
119 throw new ArgumentNullException ("action");
120 if (creationOptions > MaxTaskCreationOptions || creationOptions < TaskCreationOptions.None)
121 throw new ArgumentOutOfRangeException ("creationOptions");
124 internal Task (Action<object> action,
126 CancellationToken cancellationToken,
127 TaskCreationOptions creationOptions,
130 this.taskCreationOptions = creationOptions;
131 this.action = action;
133 this.taskId = Interlocked.Increment (ref id);
134 this.status = cancellationToken.IsCancellationRequested ? TaskStatus.Canceled : TaskStatus.Created;
135 this.token = cancellationToken;
136 this.parent = parent;
138 // Process taskCreationOptions
139 if (CheckTaskOptions (taskCreationOptions, TaskCreationOptions.AttachedToParent) && parent != null)
145 if (exception != null && !exceptionObserved)
149 bool CheckTaskOptions (TaskCreationOptions opt, TaskCreationOptions member)
151 return (opt & member) == member;
157 Start (TaskScheduler.Current);
160 public void Start (TaskScheduler scheduler)
162 if (status >= TaskStatus.WaitingToRun)
163 throw new InvalidOperationException ("The Task is not in a valid state to be started.");
164 SetupScheduler (scheduler);
168 internal void SetupScheduler (TaskScheduler scheduler)
170 this.scheduler = scheduler;
171 status = TaskStatus.WaitingForActivation;
175 public void RunSynchronously ()
177 RunSynchronously (TaskScheduler.Current);
180 public void RunSynchronously (TaskScheduler scheduler)
182 if (Status > TaskStatus.WaitingForActivation)
183 throw new InvalidOperationException ("The task is not in a valid state to be started");
185 SetupScheduler (scheduler);
186 status = TaskStatus.WaitingToRun;
188 if (scheduler.RunInline (this))
197 public Task ContinueWith (Action<Task> continuationAction)
199 return ContinueWith (continuationAction, TaskContinuationOptions.None);
202 public Task ContinueWith (Action<Task> continuationAction, TaskContinuationOptions continuationOptions)
204 return ContinueWith (continuationAction, CancellationToken.None, continuationOptions, TaskScheduler.Current);
207 public Task ContinueWith (Action<Task> continuationAction, CancellationToken cancellationToken)
209 return ContinueWith (continuationAction, cancellationToken, TaskContinuationOptions.None, TaskScheduler.Current);
212 public Task ContinueWith (Action<Task> continuationAction, TaskScheduler scheduler)
214 return ContinueWith (continuationAction, CancellationToken.None, TaskContinuationOptions.None, scheduler);
217 public Task ContinueWith (Action<Task> continuationAction, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler)
219 Task continuation = new Task ((o) => continuationAction ((Task)o),
222 GetCreationOptions (continuationOptions),
224 ContinueWithCore (continuation, continuationOptions, scheduler);
229 public Task<TResult> ContinueWith<TResult> (Func<Task, TResult> continuationFunction)
231 return ContinueWith<TResult> (continuationFunction, TaskContinuationOptions.None);
234 public Task<TResult> ContinueWith<TResult> (Func<Task, TResult> continuationFunction, TaskContinuationOptions continuationOptions)
236 return ContinueWith<TResult> (continuationFunction, CancellationToken.None, continuationOptions, TaskScheduler.Current);
239 public Task<TResult> ContinueWith<TResult> (Func<Task, TResult> continuationFunction, CancellationToken cancellationToken)
241 return ContinueWith<TResult> (continuationFunction, cancellationToken, TaskContinuationOptions.None, TaskScheduler.Current);
244 public Task<TResult> ContinueWith<TResult> (Func<Task, TResult> continuationFunction, TaskScheduler scheduler)
246 return ContinueWith<TResult> (continuationFunction, CancellationToken.None, TaskContinuationOptions.None, scheduler);
249 public Task<TResult> ContinueWith<TResult> (Func<Task, TResult> continuationFunction, CancellationToken cancellationToken,
250 TaskContinuationOptions continuationOptions, TaskScheduler scheduler)
252 if (continuationFunction == null)
253 throw new ArgumentNullException ("continuationFunction");
254 if (scheduler == null)
255 throw new ArgumentNullException ("scheduler");
257 Task<TResult> t = new Task<TResult> ((o) => continuationFunction ((Task)o),
260 GetCreationOptions (continuationOptions),
263 ContinueWithCore (t, continuationOptions, scheduler);
268 internal void ContinueWithCore (Task continuation, TaskContinuationOptions continuationOptions, TaskScheduler scheduler)
270 ContinueWithCore (continuation, continuationOptions, scheduler, null);
273 internal void ContinueWithCore (Task continuation, TaskContinuationOptions kind,
274 TaskScheduler scheduler, Func<bool> predicate)
276 // Already set the scheduler so that user can call Wait and that sort of stuff
277 continuation.scheduler = scheduler;
278 continuation.schedWait.Set ();
279 continuation.status = TaskStatus.WaitingForActivation;
281 AtomicBoolean launched = new AtomicBoolean ();
282 EventHandler action = delegate (object sender, EventArgs e) {
283 if (launched.TryRelaxedSet ()) {
284 if (predicate != null && !predicate ())
287 if (!ContinuationStatusCheck (kind)) {
288 continuation.CancelReal ();
289 continuation.Dispose ();
294 CheckAndSchedule (continuation, kind, scheduler, sender == null);
299 action (null, EventArgs.Empty);
303 if (completed == null)
304 Interlocked.CompareExchange (ref completed, new ConcurrentQueue<EventHandler> (), null);
305 completed.Enqueue (action);
307 // Retry in case completion was achieved but event adding was too late
309 action (null, EventArgs.Empty);
313 bool ContinuationStatusCheck (TaskContinuationOptions kind)
315 if (kind == TaskContinuationOptions.None)
318 int kindCode = (int)kind;
320 if (kindCode >= ((int)TaskContinuationOptions.NotOnRanToCompletion)) {
321 // Remove other options
322 kind &= ~(TaskContinuationOptions.PreferFairness
323 | TaskContinuationOptions.LongRunning
324 | TaskContinuationOptions.AttachedToParent
325 | TaskContinuationOptions.ExecuteSynchronously);
327 if (status == TaskStatus.Canceled) {
328 if (kind == TaskContinuationOptions.NotOnCanceled)
330 if (kind == TaskContinuationOptions.OnlyOnFaulted)
332 if (kind == TaskContinuationOptions.OnlyOnRanToCompletion)
334 } else if (status == TaskStatus.Faulted) {
335 if (kind == TaskContinuationOptions.NotOnFaulted)
337 if (kind == TaskContinuationOptions.OnlyOnCanceled)
339 if (kind == TaskContinuationOptions.OnlyOnRanToCompletion)
341 } else if (status == TaskStatus.RanToCompletion) {
342 if (kind == TaskContinuationOptions.NotOnRanToCompletion)
344 if (kind == TaskContinuationOptions.OnlyOnFaulted)
346 if (kind == TaskContinuationOptions.OnlyOnCanceled)
354 void CheckAndSchedule (Task continuation, TaskContinuationOptions options, TaskScheduler scheduler, bool fromCaller)
356 if ((options & TaskContinuationOptions.ExecuteSynchronously) > 0)
357 continuation.RunSynchronously (scheduler);
359 continuation.Start (scheduler);
362 internal TaskCreationOptions GetCreationOptions (TaskContinuationOptions kind)
364 TaskCreationOptions options = TaskCreationOptions.None;
365 if ((kind & TaskContinuationOptions.AttachedToParent) > 0)
366 options |= TaskCreationOptions.AttachedToParent;
367 if ((kind & TaskContinuationOptions.PreferFairness) > 0)
368 options |= TaskCreationOptions.PreferFairness;
369 if ((kind & TaskContinuationOptions.LongRunning) > 0)
370 options |= TaskCreationOptions.LongRunning;
376 #region Internal and protected thingies
377 internal void Schedule ()
379 status = TaskStatus.WaitingToRun;
381 // If worker is null it means it is a local one, revert to the old behavior
382 // If TaskScheduler.Current is not being used, the scheduler was explicitly provided, so we must use that
383 if (scheduler != TaskScheduler.Current || childWorkAdder == null || CheckTaskOptions (taskCreationOptions, TaskCreationOptions.PreferFairness)) {
384 scheduler.QueueTask (this);
386 /* Like the semantic of the ABP paper describe it, we add ourselves to the bottom
387 * of our Parent Task's ThreadWorker deque. It's ok to do that since we are in
388 * the correct Thread during the creation
390 childWorkAdder (this);
396 /* Allow scheduler to break fairness of deque ordering without
397 * breaking its semantic (the task can be executed twice but the
398 * second time it will return immediately
400 if (!executing.TryRelaxedSet ())
404 TaskScheduler.Current = scheduler;
406 if (!token.IsCancellationRequested) {
408 status = TaskStatus.Running;
412 } catch (OperationCanceledException oce) {
413 if (oce.CancellationToken == token)
416 HandleGenericException (oce);
417 } catch (Exception e) {
418 HandleGenericException (e);
427 internal void Execute (Action<Task> childAdder)
429 childWorkAdder = childAdder;
433 internal void AddChild ()
435 childTasks.AddCount ();
438 internal void ChildCompleted (AggregateException childEx)
440 if (childEx != null) {
441 if (childExceptions == null)
442 Interlocked.CompareExchange (ref childExceptions, new ConcurrentQueue<AggregateException> (), null);
443 childExceptions.Enqueue (childEx);
446 if (childTasks.Signal () && status == TaskStatus.WaitingForChildrenToComplete) {
447 status = TaskStatus.RanToCompletion;
448 ProcessChildExceptions ();
449 ProcessCompleteDelegates ();
453 internal virtual void InnerInvoke ()
455 if (action == null && simpleAction != null)
457 else if (action != null)
459 // Set action to null so that the GC can collect the delegate and thus
460 // any big object references that the user might have captured in an anonymous method
466 internal void Finish ()
468 // If there wasn't any child created in the task we set the CountdownEvent
469 childTasks.Signal ();
471 // Don't override Canceled or Faulted
472 if (status == TaskStatus.Running) {
473 if (childTasks.IsSet)
474 status = TaskStatus.RanToCompletion;
476 status = TaskStatus.WaitingForChildrenToComplete;
479 if (status != TaskStatus.WaitingForChildrenToComplete)
480 ProcessCompleteDelegates ();
482 // Reset the current thingies
484 TaskScheduler.Current = null;
486 // Tell parent that we are finished
487 if (CheckTaskOptions (taskCreationOptions, TaskCreationOptions.AttachedToParent) && parent != null) {
488 parent.ChildCompleted (this.Exception);
492 void ProcessCompleteDelegates ()
494 if (completed == null)
497 EventHandler handler;
498 while (completed.TryDequeue (out handler))
499 handler (this, EventArgs.Empty);
502 void ProcessChildExceptions ()
504 if (childExceptions == null)
507 if (exception == null)
508 exception = new AggregateException ();
510 AggregateException childEx;
511 while (childExceptions.TryDequeue (out childEx))
512 exception.AddChildException (childEx);
516 #region Cancel and Wait related method
518 internal void CancelReal ()
520 status = TaskStatus.Canceled;
523 internal void HandleGenericException (Exception e)
525 HandleGenericException (new AggregateException (e));
528 internal void HandleGenericException (AggregateException e)
531 Thread.MemoryBarrier ();
532 status = TaskStatus.Faulted;
533 if (scheduler != null && scheduler.FireUnobservedEvent (exception).Observed)
534 exceptionObserved = true;
539 if (scheduler == null)
543 scheduler.ParticipateUntil (this);
544 if (exception != null)
547 throw new AggregateException (new TaskCanceledException (this));
550 public void Wait (CancellationToken cancellationToken)
552 Wait (-1, cancellationToken);
555 public bool Wait (TimeSpan timeout)
557 return Wait (CheckTimeout (timeout), CancellationToken.None);
560 public bool Wait (int millisecondsTimeout)
562 return Wait (millisecondsTimeout, CancellationToken.None);
565 public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
567 if (millisecondsTimeout < -1)
568 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
570 if (millisecondsTimeout == -1 && token == CancellationToken.None) {
575 Watch watch = Watch.StartNew ();
577 if (scheduler == null) {
578 schedWait.Wait (millisecondsTimeout, cancellationToken);
579 millisecondsTimeout = ComputeTimeout (millisecondsTimeout, watch);
582 ManualResetEventSlim predicateEvt = new ManualResetEventSlim (false);
583 if (cancellationToken != CancellationToken.None) {
584 cancellationToken.Register (predicateEvt.Set);
585 cancellationToken.ThrowIfCancellationRequested ();
588 bool result = scheduler.ParticipateUntil (this, predicateEvt, millisecondsTimeout);
590 if (exception != null)
593 throw new AggregateException (new TaskCanceledException (this));
598 public static void WaitAll (params Task[] tasks)
601 throw new ArgumentNullException ("tasks");
603 foreach (var t in tasks) {
605 throw new ArgumentNullException ("tasks", "the tasks argument contains a null element");
610 public static void WaitAll (Task[] tasks, CancellationToken cancellationToken)
613 throw new ArgumentNullException ("tasks");
615 foreach (var t in tasks) {
617 throw new ArgumentNullException ("tasks", "the tasks argument contains a null element");
619 t.Wait (cancellationToken);
623 public static bool WaitAll (Task[] tasks, TimeSpan timeout)
626 throw new ArgumentNullException ("tasks");
629 foreach (var t in tasks) {
631 throw new ArgumentNullException ("tasks", "the tasks argument contains a null element");
633 result &= t.Wait (timeout);
638 public static bool WaitAll (Task[] tasks, int millisecondsTimeout)
641 throw new ArgumentNullException ("tasks");
644 foreach (var t in tasks) {
646 throw new ArgumentNullException ("tasks", "the tasks argument contains a null element");
648 result &= t.Wait (millisecondsTimeout);
653 public static bool WaitAll (Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken)
656 throw new ArgumentNullException ("tasks");
659 foreach (var t in tasks) {
661 throw new ArgumentNullException ("tasks", "the tasks argument contains a null element");
663 result &= t.Wait (millisecondsTimeout, cancellationToken);
668 public static int WaitAny (params Task[] tasks)
670 return WaitAny (tasks, -1, CancellationToken.None);
673 public static int WaitAny (Task[] tasks, TimeSpan timeout)
675 return WaitAny (tasks, CheckTimeout (timeout));
678 public static int WaitAny (Task[] tasks, int millisecondsTimeout)
680 if (millisecondsTimeout < -1)
681 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
683 if (millisecondsTimeout == -1)
684 return WaitAny (tasks);
686 return WaitAny (tasks, millisecondsTimeout, CancellationToken.None);
689 public static int WaitAny (Task[] tasks, CancellationToken cancellationToken)
691 return WaitAny (tasks, -1, cancellationToken);
694 public static int WaitAny (Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken)
697 throw new ArgumentNullException ("tasks");
698 if (tasks.Length == 0)
699 throw new ArgumentException ("tasks is empty", "tasks");
700 if (tasks.Length == 1) {
701 tasks[0].Wait (millisecondsTimeout, cancellationToken);
706 int indexFirstFinished = -1;
708 TaskScheduler sched = null;
710 Watch watch = Watch.StartNew ();
711 ManualResetEventSlim predicateEvt = new ManualResetEventSlim (false);
713 foreach (Task t in tasks) {
714 int indexResult = index++;
715 t.ContinueWith (delegate {
716 if (numFinished >= 1)
718 int result = Interlocked.Increment (ref numFinished);
720 // Check if we are the first to have finished
722 indexFirstFinished = indexResult;
726 }, TaskContinuationOptions.ExecuteSynchronously);
728 if (sched == null && t.scheduler != null) {
734 // If none of task have a scheduler we are forced to wait for at least one to start
736 var handles = Array.ConvertAll (tasks, t => t.schedWait.WaitHandle);
738 if ((shandle = WaitHandle.WaitAny (handles, millisecondsTimeout)) == WaitHandle.WaitTimeout)
740 sched = tasks[shandle].scheduler;
741 task = tasks[shandle];
742 millisecondsTimeout = ComputeTimeout (millisecondsTimeout, watch);
745 // One task already finished
746 if (indexFirstFinished != -1)
747 return indexFirstFinished;
749 if (cancellationToken != CancellationToken.None) {
750 cancellationToken.Register (predicateEvt.Set);
751 cancellationToken.ThrowIfCancellationRequested ();
754 sched.ParticipateUntil (task, predicateEvt, millisecondsTimeout);
756 // Index update is still not done
757 if (indexFirstFinished == -1) {
758 SpinWait wait = new SpinWait ();
759 while (indexFirstFinished == -1)
763 return indexFirstFinished;
766 static int CheckTimeout (TimeSpan timeout)
769 return checked ((int)timeout.TotalMilliseconds);
770 } catch (System.OverflowException) {
771 throw new ArgumentOutOfRangeException ("timeout");
775 static int ComputeTimeout (int millisecondsTimeout, Watch watch)
777 return millisecondsTimeout == -1 ? -1 : (int)Math.Max (watch.ElapsedMilliseconds - millisecondsTimeout, 1);
783 public void Dispose ()
788 protected virtual void Dispose (bool disposing)
791 throw new InvalidOperationException ("A task may only be disposed if it is in a completion state");
793 // Set action to null so that the GC can collect the delegate and thus
794 // any big object references that the user might have captured in a anonymous method
803 public static TaskFactory Factory {
805 return defaultFactory;
809 public static int? CurrentId {
812 return t == null ? (int?)null : t.Id;
816 public AggregateException Exception {
818 exceptionObserved = true;
827 public bool IsCanceled {
829 return status == TaskStatus.Canceled;
833 public bool IsCompleted {
835 return status == TaskStatus.RanToCompletion ||
836 status == TaskStatus.Canceled || status == TaskStatus.Faulted;
840 public bool IsFaulted {
842 return status == TaskStatus.Faulted;
846 public TaskCreationOptions CreationOptions {
848 return taskCreationOptions;
852 public TaskStatus Status {
861 public object AsyncState {
867 bool IAsyncResult.CompletedSynchronously {
873 WaitHandle IAsyncResult.AsyncWaitHandle {
885 internal Task Parent {
891 string DisplayActionMethod {
893 Delegate d = simpleAction ?? (Delegate) action;
894 return d == null ? "<none>" : d.Method.ToString ();