// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// TaskContinuation.cs
//
// [....]
//
// Implementation of task continuations, TaskContinuation, and its descendants.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Security;
using System.Diagnostics.Contracts;
using System.Runtime.ExceptionServices;
using System.Runtime.CompilerServices;
using System.Threading;
#if FEATURE_COMINTEROP
using System.Runtime.InteropServices.WindowsRuntime;
#endif // FEATURE_COMINTEROP
namespace System.Threading.Tasks
{
// Task type used to implement: Task ContinueWith(Action)
internal sealed class ContinuationTaskFromTask : Task
{
private Task m_antecedent;
public ContinuationTaskFromTask(
Task antecedent, Delegate action, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) :
base(action, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null)
{
Contract.Requires(action is Action || action is Action,
"Invalid delegate type in ContinuationTaskFromTask");
m_antecedent = antecedent;
PossiblyCaptureContext(ref stackMark);
}
///
/// Evaluates the value selector of the Task which is passed in as an object and stores the result.
///
internal override void InnerInvoke()
{
// Get and null out the antecedent. This is crucial to avoid a memory
// leak with long chains of continuations.
var antecedent = m_antecedent;
Contract.Assert(antecedent != null,
"No antecedent was set for the ContinuationTaskFromTask.");
m_antecedent = null;
// Notify the debugger we're completing an asynchronous wait on a task
antecedent.NotifyDebuggerOfWaitCompletionIfNecessary();
// Invoke the delegate
Contract.Assert(m_action != null);
var action = m_action as Action;
if (action != null)
{
action(antecedent);
return;
}
var actionWithState = m_action as Action;
if (actionWithState != null)
{
actionWithState(antecedent, m_stateObject);
return;
}
Contract.Assert(false, "Invalid m_action in ContinuationTaskFromTask");
}
}
// Task type used to implement: Task ContinueWith(Func)
internal sealed class ContinuationResultTaskFromTask : Task
{
private Task m_antecedent;
public ContinuationResultTaskFromTask(
Task antecedent, Delegate function, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) :
base(function, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null)
{
Contract.Requires(function is Func || function is Func,
"Invalid delegate type in ContinuationResultTaskFromTask");
m_antecedent = antecedent;
PossiblyCaptureContext(ref stackMark);
}
///
/// Evaluates the value selector of the Task which is passed in as an object and stores the result.
///
internal override void InnerInvoke()
{
// Get and null out the antecedent. This is crucial to avoid a memory
// leak with long chains of continuations.
var antecedent = m_antecedent;
Contract.Assert(antecedent != null,
"No antecedent was set for the ContinuationResultTaskFromTask.");
m_antecedent = null;
// Notify the debugger we're completing an asynchronous wait on a task
antecedent.NotifyDebuggerOfWaitCompletionIfNecessary();
// Invoke the delegate
Contract.Assert(m_action != null);
var func = m_action as Func;
if (func != null)
{
m_result = func(antecedent);
return;
}
var funcWithState = m_action as Func;
if (funcWithState != null)
{
m_result = funcWithState(antecedent, m_stateObject);
return;
}
Contract.Assert(false, "Invalid m_action in ContinuationResultTaskFromTask");
}
}
// Task type used to implement: Task ContinueWith(Action,...>)
internal sealed class ContinuationTaskFromResultTask : Task
{
private Task m_antecedent;
public ContinuationTaskFromResultTask(
Task antecedent, Delegate action, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) :
base(action, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null)
{
Contract.Requires(action is Action> || action is Action, object>,
"Invalid delegate type in ContinuationTaskFromResultTask");
m_antecedent = antecedent;
PossiblyCaptureContext(ref stackMark);
}
///
/// Evaluates the value selector of the Task which is passed in as an object and stores the result.
///
internal override void InnerInvoke()
{
// Get and null out the antecedent. This is crucial to avoid a memory
// leak with long chains of continuations.
var antecedent = m_antecedent;
Contract.Assert(antecedent != null,
"No antecedent was set for the ContinuationTaskFromResultTask.");
m_antecedent = null;
// Notify the debugger we're completing an asynchronous wait on a task
antecedent.NotifyDebuggerOfWaitCompletionIfNecessary();
// Invoke the delegate
Contract.Assert(m_action != null);
var action = m_action as Action>;
if (action != null)
{
action(antecedent);
return;
}
var actionWithState = m_action as Action, object>;
if (actionWithState != null)
{
actionWithState(antecedent, m_stateObject);
return;
}
Contract.Assert(false, "Invalid m_action in ContinuationTaskFromResultTask");
}
}
// Task type used to implement: Task ContinueWith(Func,...>)
internal sealed class ContinuationResultTaskFromResultTask : Task
{
private Task m_antecedent;
public ContinuationResultTaskFromResultTask(
Task antecedent, Delegate function, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) :
base(function, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null)
{
Contract.Requires(function is Func, TResult> || function is Func, object, TResult>,
"Invalid delegate type in ContinuationResultTaskFromResultTask");
m_antecedent = antecedent;
PossiblyCaptureContext(ref stackMark);
}
///
/// Evaluates the value selector of the Task which is passed in as an object and stores the result.
///
internal override void InnerInvoke()
{
// Get and null out the antecedent. This is crucial to avoid a memory
// leak with long chains of continuations.
var antecedent = m_antecedent;
Contract.Assert(antecedent != null,
"No antecedent was set for the ContinuationResultTaskFromResultTask.");
m_antecedent = null;
// Notify the debugger we're completing an asynchronous wait on a task
antecedent.NotifyDebuggerOfWaitCompletionIfNecessary();
// Invoke the delegate
Contract.Assert(m_action != null);
var func = m_action as Func, TResult>;
if (func != null)
{
m_result = func(antecedent);
return;
}
var funcWithState = m_action as Func, object, TResult>;
if (funcWithState != null)
{
m_result = funcWithState(antecedent, m_stateObject);
return;
}
Contract.Assert(false, "Invalid m_action in ContinuationResultTaskFromResultTask");
}
}
// For performance reasons, we don't just have a single way of representing
// a continuation object. Rather, we have a hierarchy of types:
// - TaskContinuation: abstract base that provides a virtual Run method
// - StandardTaskContinuation: wraps a task,options,and scheduler, and overrides Run to process the task with that configuration
// - AwaitTaskContinuation: base for continuations created through TaskAwaiter; targets default scheduler by default
// - TaskSchedulerAwaitTaskContinuation: awaiting with a non-default TaskScheduler
// - SynchronizationContextAwaitTaskContinuation: awaiting with a "current" [....] ctx
/// Represents a continuation.
internal abstract class TaskContinuation
{
/// Inlines or schedules the continuation.
/// The antecedent task that has completed.
/// true if inlining is permitted; otherwise, false.
internal abstract void Run(Task completedTask, bool bCanInlineContinuationTask);
/// Tries to run the task on the current thread, if possible; otherwise, schedules it.
/// The task to run
///
/// true if we need to protect against multiple threads racing to start/cancel the task; otherwise, false.
///
[SecuritySafeCritical]
protected static void InlineIfPossibleOrElseQueue(Task task, bool needsProtection)
{
Contract.Requires(task != null);
Contract.Assert(task.m_taskScheduler != null);
// Set the TASK_STATE_STARTED flag. This only needs to be done
// if the task may be canceled or if someone else has a reference to it
// that may try to execute it.
if (needsProtection)
{
if (!task.MarkStarted())
return; // task has been previously started or canceled. Stop processing.
}
else
{
task.m_stateFlags |= Task.TASK_STATE_STARTED;
}
// Try to inline it but queue if we can't
try
{
if (!task.m_taskScheduler.TryRunInline(task, taskWasPreviouslyQueued: false))
{
task.m_taskScheduler.InternalQueueTask(task);
}
}
catch (Exception e)
{
// Either TryRunInline() or QueueTask() threw an exception. Record the exception, marking the task as Faulted.
// However if it was a ThreadAbortException coming from TryRunInline we need to skip here,
// because it would already have been handled in Task.Execute()
if (!(e is ThreadAbortException &&
(task.m_stateFlags & Task.TASK_STATE_THREAD_WAS_ABORTED) != 0)) // this ensures TAEs from QueueTask will be wrapped in TSE
{
TaskSchedulerException tse = new TaskSchedulerException(e);
task.AddException(tse);
task.Finish(false);
}
// Don't re-throw.
}
}
internal abstract Delegate[] GetDelegateContinuationsForDebugger();
}
/// Provides the standard implementation of a task continuation.
internal class StandardTaskContinuation : TaskContinuation
{
/// The unstarted continuation task.
internal readonly Task m_task;
/// The options to use with the continuation task.
internal readonly TaskContinuationOptions m_options;
/// The task scheduler with which to run the continuation task.
private readonly TaskScheduler m_taskScheduler;
/// Initializes a new continuation.
/// The task to be activated.
/// The continuation options.
/// The scheduler to use for the continuation.
internal StandardTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
{
Contract.Requires(task != null, "TaskContinuation ctor: task is null");
Contract.Requires(scheduler != null, "TaskContinuation ctor: scheduler is null");
m_task = task;
m_options = options;
m_taskScheduler = scheduler;
if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, m_task.Id, "Task.ContinueWith: " + ((Delegate)task.m_action).Method.Name, 0);
if (Task.s_asyncDebuggingEnabled)
{
Task.AddToActiveTasks(m_task);
}
}
/// Invokes the continuation for the target completion task.
/// The completed task.
/// Whether the continuation can be inlined.
internal override void Run(Task completedTask, bool bCanInlineContinuationTask)
{
Contract.Assert(completedTask != null);
Contract.Assert(completedTask.IsCompleted, "ContinuationTask.Run(): completedTask not completed");
// Check if the completion status of the task works with the desired
// activation criteria of the TaskContinuationOptions.
TaskContinuationOptions options = m_options;
bool isRightKind =
completedTask.IsRanToCompletion ?
(options & TaskContinuationOptions.NotOnRanToCompletion) == 0 :
(completedTask.IsCanceled ?
(options & TaskContinuationOptions.NotOnCanceled) == 0 :
(options & TaskContinuationOptions.NotOnFaulted) == 0);
// If the completion status is allowed, run the continuation.
Task continuationTask = m_task;
if (isRightKind)
{
//If the task was cancel before running (e.g a ContinueWhenAll with a cancelled caancelation token)
//we will still flow it to ScheduleAndStart() were it will check the status before running
//We check here to avoid faulty logs that contain a join event to an operation that was already set as completed.
if (!continuationTask.IsCanceled && AsyncCausalityTracer.LoggingOn)
{
// Log now that we are sure that this continuation is being ran
AsyncCausalityTracer.TraceOperationRelation(CausalityTraceLevel.Important, continuationTask.Id, CausalityRelation.AssignDelegate);
}
continuationTask.m_taskScheduler = m_taskScheduler;
// Either run directly or just queue it up for execution, depending
// on whether synchronous or asynchronous execution is wanted.
if (bCanInlineContinuationTask && // inlining is allowed by the caller
(options & TaskContinuationOptions.ExecuteSynchronously) != 0) // synchronous execution was requested by the continuation's creator
{
InlineIfPossibleOrElseQueue(continuationTask, needsProtection: true);
}
else
{
try { continuationTask.ScheduleAndStart(needsProtection: true); }
catch (TaskSchedulerException)
{
// No further action is necessary -- ScheduleAndStart() already transitioned the
// task to faulted. But we want to make sure that no exception is thrown from here.
}
}
}
// Otherwise, the final state of this task does not match the desired
// continuation activation criteria; cancel it to denote this.
else continuationTask.InternalCancel(false);
}
internal override Delegate[] GetDelegateContinuationsForDebugger()
{
if (m_task.m_action == null)
{
return m_task.GetDelegateContinuationsForDebugger();
}
return new Delegate[] { m_task.m_action as Delegate };
}
}
/// Task continuation for awaiting with a current synchronization context.
internal sealed class SynchronizationContextAwaitTaskContinuation : AwaitTaskContinuation
{
/// SendOrPostCallback delegate to invoke the action.
private readonly static SendOrPostCallback s_postCallback = state => ((Action)state)(); // can't use InvokeAction as it's SecurityCritical
/// Cached delegate for PostAction
[SecurityCritical]
private static ContextCallback s_postActionCallback;
/// The context with which to run the action.
private readonly SynchronizationContext m_syncContext;
/// Initializes the SynchronizationContextAwaitTaskContinuation.
/// The synchronization context with which to invoke the action. Must not be null.
/// The action to invoke. Must not be null.
/// Whether to capture and restore ExecutionContext.
/// The captured stack mark.
[SecurityCritical]
internal SynchronizationContextAwaitTaskContinuation(
SynchronizationContext context, Action action, bool flowExecutionContext, ref StackCrawlMark stackMark) :
base(action, flowExecutionContext, ref stackMark)
{
Contract.Assert(context != null);
m_syncContext = context;
}
/// Inlines or schedules the continuation.
/// The antecedent task, which is ignored.
/// true if inlining is permitted; otherwise, false.
[SecuritySafeCritical]
internal sealed override void Run(Task task, bool canInlineContinuationTask)
{
// If we're allowed to inline, run the action on this thread.
if (canInlineContinuationTask &&
m_syncContext == SynchronizationContext.CurrentNoFlow)
{
RunCallback(GetInvokeActionCallback(), m_action, ref Task.t_currentTask);
}
// Otherwise, Post the action back to the SynchronizationContext.
else
{
#if !MONO
TplEtwProvider etwLog = TplEtwProvider.Log;
if (etwLog.IsEnabled())
{
m_continuationId = Task.NewId();
etwLog.AwaitTaskContinuationScheduled((task.ExecutingTaskScheduler ?? TaskScheduler.Default).Id, task.Id, m_continuationId);
}
#endif
RunCallback(GetPostActionCallback(), this, ref Task.t_currentTask);
}
// Any exceptions will be handled by RunCallback.
}
/// Calls InvokeOrPostAction(false) on the supplied SynchronizationContextAwaitTaskContinuation.
/// The SynchronizationContextAwaitTaskContinuation.
[SecurityCritical]
private static void PostAction(object state)
{
var c = (SynchronizationContextAwaitTaskContinuation)state;
#if !MONO
TplEtwProvider etwLog = TplEtwProvider.Log;
if (etwLog.TasksSetActivityIds && c.m_continuationId != 0)
{
c.m_syncContext.Post(s_postCallback, GetActionLogDelegate(c.m_continuationId, c.m_action));
}
else
#endif
{
c.m_syncContext.Post(s_postCallback, c.m_action); // s_postCallback is manually cached, as the compiler won't in a SecurityCritical method
}
}
#if !MONO
private static Action GetActionLogDelegate(int continuationId, Action action)
{
return () =>
{
Guid savedActivityId;
Guid activityId = TplEtwProvider.CreateGuidForTaskID(continuationId);
System.Diagnostics.Tracing.EventSource.SetCurrentThreadActivityId(activityId, out savedActivityId);
try { action(); }
finally { System.Diagnostics.Tracing.EventSource.SetCurrentThreadActivityId(savedActivityId); }
};
}
#endif
/// Gets a cached delegate for the PostAction method.
///
/// A delegate for PostAction, which expects a SynchronizationContextAwaitTaskContinuation
/// to be passed as state.
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[SecurityCritical]
private static ContextCallback GetPostActionCallback()
{
ContextCallback callback = s_postActionCallback;
if (callback == null) { s_postActionCallback = callback = PostAction; } // lazily initialize SecurityCritical delegate
return callback;
}
}
/// Task continuation for awaiting with a task scheduler.
internal sealed class TaskSchedulerAwaitTaskContinuation : AwaitTaskContinuation
{
/// The scheduler on which to run the action.
private readonly TaskScheduler m_scheduler;
/// Initializes the TaskSchedulerAwaitTaskContinuation.
/// The task scheduler with which to invoke the action. Must not be null.
/// The action to invoke. Must not be null.
/// Whether to capture and restore ExecutionContext.
/// The captured stack mark.
[SecurityCritical]
internal TaskSchedulerAwaitTaskContinuation(
TaskScheduler scheduler, Action action, bool flowExecutionContext, ref StackCrawlMark stackMark) :
base(action, flowExecutionContext, ref stackMark)
{
Contract.Assert(scheduler != null);
m_scheduler = scheduler;
}
/// Inlines or schedules the continuation.
/// The antecedent task, which is ignored.
/// true if inlining is permitted; otherwise, false.
internal sealed override void Run(Task ignored, bool canInlineContinuationTask)
{
// If we're targeting the default scheduler, we can use the faster path provided by the base class.
if (m_scheduler == TaskScheduler.Default)
{
base.Run(ignored, canInlineContinuationTask);
}
else
{
// We permit inlining if the caller allows us to, and
// either we're on a thread pool thread (in which case we're fine running arbitrary code)
// or we're already on the target scheduler (in which case we'll just ask the scheduler
// whether it's ok to run here). We include the IsThreadPoolThread check here, whereas
// we don't in AwaitTaskContinuation.Run, since here it expands what's allowed as opposed
// to in AwaitTaskContinuation.Run where it restricts what's allowed.
bool inlineIfPossible = canInlineContinuationTask &&
(TaskScheduler.InternalCurrent == m_scheduler || Thread.CurrentThread.IsThreadPoolThread);
// Create the continuation task task. If we're allowed to inline, try to do so.
// The target scheduler may still deny us from executing on this thread, in which case this'll be queued.
var task = CreateTask(state => {
try { ((Action)state)(); }
catch (Exception exc) { ThrowAsyncIfNecessary(exc); }
}, m_action, m_scheduler);
if (inlineIfPossible)
{
InlineIfPossibleOrElseQueue(task, needsProtection: false);
}
else
{
// We need to run asynchronously, so just schedule the task.
try { task.ScheduleAndStart(needsProtection: false); }
catch (TaskSchedulerException) { } // No further action is necessary, as ScheduleAndStart already transitioned task to faulted
}
}
}
}
/// Base task continuation class used for await continuations.
internal class AwaitTaskContinuation : TaskContinuation, IThreadPoolWorkItem
{
/// The ExecutionContext with which to run the continuation.
private readonly ExecutionContext m_capturedContext;
/// The action to invoke.
protected readonly Action m_action;
#if !MONO
protected int m_continuationId;
#endif
/// Initializes the continuation.
/// The action to invoke. Must not be null.
/// Whether to capture and restore ExecutionContext.
/// The captured stack mark with which to construct an ExecutionContext.
[SecurityCritical]
internal AwaitTaskContinuation(Action action, bool flowExecutionContext, ref StackCrawlMark stackMark)
{
Contract.Requires(action != null);
m_action = action;
if (flowExecutionContext)
{
m_capturedContext = ExecutionContext.Capture(
ref stackMark,
ExecutionContext.CaptureOptions.IgnoreSyncCtx | ExecutionContext.CaptureOptions.OptimizeDefaultCase);
}
}
/// Initializes the continuation.
/// The action to invoke. Must not be null.
/// Whether to capture and restore ExecutionContext.
[SecurityCritical]
internal AwaitTaskContinuation(Action action, bool flowExecutionContext)
{
Contract.Requires(action != null);
m_action = action;
if (flowExecutionContext)
{
m_capturedContext = ExecutionContext.FastCapture();
}
}
/// Creates a task to run the action with the specified state on the specified scheduler.
/// The action to run. Must not be null.
/// The state to pass to the action. Must not be null.
/// The scheduler to target.
/// The created task.
protected Task CreateTask(Action