//------------------------------------------------------------------------------ // // Copyright (c) Microsoft Corporation. All rights reserved. // //------------------------------------------------------------------------------ namespace System.Web.Util { using System; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; // This class is used by the AspNetSynchronizationContext to assist with scheduling tasks in a non-blocking fashion. // Asynchronous work will be queued and will execute sequentially, never consuming more than a single thread at a time. // Synchronous work will block and will execute on the current thread. internal sealed class SynchronizationHelper { private Task _completionTask; // the Task that will run when all in-flight operations have completed private Thread _currentThread; // the Thread that's running the current Task; all threads must see the same value for this field private Task _lastScheduledTask = CreateInitialTask(); // the last Task that was queued to this helper, used to hook future Tasks (not volatile since always accessed under lock) private Task _lastScheduledTaskAsync = CreateInitialTask(); // the last async Task that was queued to this helper private readonly object _lockObj = new object(); // synchronizes access to _lastScheduledTask private int _operationsInFlight; // operation counter private readonly ISyncContext _syncContext; // a context that wraps an operation with pre- and post-execution phases private readonly Action _appVerifierCallback; // for making sure that developers don't try calling us after the request has completed public SynchronizationHelper(ISyncContext syncContext) { _syncContext = syncContext; _appVerifierCallback = AppVerifier.GetSyncContextCheckDelegate(syncContext); } // If an operation results in an exception, this property will provide access to it. public ExceptionDispatchInfo Error { get; set; } // Helper to access the _currentThread field in a thread-safe fashion. // It is not enough to mark the _currentThread field volatile, since that only guarantees // read / write ordering and doesn't ensure that each thread sees the same value. private Thread CurrentThread { get { return Interlocked.CompareExchange(ref _currentThread, null, null); } set { Interlocked.Exchange(ref _currentThread, value); } } // Returns the number of pending operations public int PendingCount { get { return ChangeOperationCount(0); } } public int ChangeOperationCount(int addend) { int newOperationCount = Interlocked.Add(ref _operationsInFlight, addend); if (newOperationCount == 0) { // if an asynchronous completion operation is queued, run it Task completionTask = Interlocked.Exchange(ref _completionTask, null); if (completionTask != null) { completionTask.Start(); } } return newOperationCount; } private void CheckForRequestStateIfRequired(bool checkForReEntry) { if (_appVerifierCallback != null) { _appVerifierCallback(checkForReEntry); } } // Creates the initial hook that future operations can ride off of private static Task CreateInitialTask() { return Task.FromResult(null); } // Takes control of this SynchronizationHelper instance synchronously. Asynchronous operations // will be queued but will not be dispatched until control is released (by disposing of the // returned object). This operation might block if a different thread is currently in // control of the context. public IDisposable EnterSynchronousControl() { if (CurrentThread == Thread.CurrentThread) { // If the current thread already has control of this context, there's nothing extra to do. return DisposableAction.Empty; } // used to mark the end of the synchronous task TaskCompletionSource tcs = new TaskCompletionSource(); Task lastTask; lock (_lockObj) { lastTask = _lastScheduledTask; _lastScheduledTask = tcs.Task; // future work can be scheduled off this Task } // The original task may end up Faulted, which would make its Wait() method throw an exception. // To avoid this, we instead wait on a continuation which is always guaranteed to complete successfully. if (!lastTask.IsCompleted) { lastTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously).Wait(); } CurrentThread = Thread.CurrentThread; // synchronous control is released by marking the Task as complete return new DisposableAction(() => { CurrentThread = null; tcs.TrySetResult(null); }); } public void QueueAsynchronous(Action action) { CheckForRequestStateIfRequired(checkForReEntry: true); ChangeOperationCount(+1); // This method only schedules work; it doesn't itself do any work. The lock is held for a very // short period of time. lock (_lockObj) { Task newTask = _lastScheduledTask.ContinueWith(_ => SafeWrapCallback(action), TaskScheduler.Default); _lastScheduledTask = newTask; // the newly-created task is now the last one } } // QueueAsynchronousAsync and SafeWrapCallbackAsync guarantee: // 1. For funcs posted here, it's would first come, first complete. // 2. There is no overlapping execution. public void QueueAsynchronousAsync(Func func, object state) { CheckForRequestStateIfRequired(checkForReEntry: true); ChangeOperationCount(+1); // This method only schedules work; it doesn't itself do any work. The lock is held for a very // short period of time. lock (_lockObj) { // 1. Note that we are chaining newTask with _lastScheduledTaskAsync, not _lastScheduledTask. // Chaining newTask with _lastScheduledTask would cause deadlock. // 2. Unwrap() is necessary to be called here. When chaining multiple tasks using the ContinueWith // method, your return type will be Task whereas T is the return type of the delegate/method // passed to ContinueWith. As the return type of an async delegate is a Task, you will end up with // a Task and end up waiting for the async delegate to return you the Task which is done after // the first await. Task newTask = _lastScheduledTaskAsync.ContinueWith( async _ => { await SafeWrapCallbackAsync(func, state); }).Unwrap(); _lastScheduledTaskAsync = newTask; // the newly-created task is now the last one } } public void QueueSynchronous(Action action) { CheckForRequestStateIfRequired(checkForReEntry: false); if (CurrentThread == Thread.CurrentThread) { // current thread already owns the context, so just execute inline to prevent deadlocks action(); return; } ChangeOperationCount(+1); using (EnterSynchronousControl()) { SafeWrapCallback(action); } } private void SafeWrapCallback(Action action) { // This method will try to catch exceptions so that they don't bubble up to our // callers. However, ThreadAbortExceptions will continue to bubble up. try { CurrentThread = Thread.CurrentThread; ISyncContextLock syncContextLock = null; try { syncContextLock = (_syncContext != null) ? _syncContext.Enter() : null; try { action(); } catch (Exception ex) { Error = ExceptionDispatchInfo.Capture(ex); } } finally { if (syncContextLock != null) { syncContextLock.Leave(); } } } finally { CurrentThread = null; ChangeOperationCount(-1); } } // This method does not run the func by itself. It simply queues the func into the existing // syncContext queue. private async Task SafeWrapCallbackAsync(Func func, object state) { try { TaskCompletionSource tcs = new TaskCompletionSource(); QueueAsynchronous(() => { var t = func(state); t.ContinueWith((_) => { if (t.IsFaulted) { tcs.TrySetException(t.Exception.InnerExceptions); } else if (t.IsCanceled) { tcs.TrySetCanceled(); } else { tcs.TrySetResult(t); } }, TaskContinuationOptions.ExecuteSynchronously); }); await tcs.Task; } catch (Exception ex) { Error = ExceptionDispatchInfo.Capture(ex); } finally { ChangeOperationCount(-1); } } // Sets the continuation that will asynchronously execute when the pending operation counter // hits zero. Returns true if asynchronous execution is expected, false if the operation // counter is already at zero and the caller should run the continuation inline. public bool TrySetCompletionContinuation(Action continuation) { int newOperationCount = ChangeOperationCount(+1); // prevent the operation counter from hitting zero while we're setting the field bool scheduledAsynchronously = (newOperationCount > 1); if (scheduledAsynchronously) { Interlocked.Exchange(ref _completionTask, new Task(continuation)); } ChangeOperationCount(-1); return scheduledAsynchronously; } } }