Merge pull request #2223 from lobrien/master
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / Common.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // Common.cs
7 //
8 //
9 // Helper routines for the rest of the TPL Dataflow implementation.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Security;
18 using System.Collections;
19 using System.Runtime.ExceptionServices;
20 using System.Threading.Tasks.Dataflow.Internal.Threading;
21
22 namespace System.Threading.Tasks.Dataflow.Internal
23 {
24     /// <summary>Internal helper utilities.</summary>
25     internal static class Common
26     {
27         /// <summary>
28         /// An invalid ID to assign for reordering purposes.  This value is chosen to be the last of the 64-bit integers that
29         /// could ever be assigned as a reordering ID.
30         /// </summary>
31         internal const long INVALID_REORDERING_ID = -1;
32         /// <summary>A well-known message ID for code that will send exactly one message or 
33         /// where the exact message ID is not important.</summary>
34         internal const int SINGLE_MESSAGE_ID = 1;
35         /// <summary>A perf optimization for caching a well-known message header instead of
36         /// constructing one every time it is needed.</summary>
37         internal static readonly DataflowMessageHeader SingleMessageHeader = new DataflowMessageHeader(SINGLE_MESSAGE_ID);
38         /// <summary>The cached completed Task{bool} with a result of true.</summary>
39         internal static readonly Task<bool> CompletedTaskWithTrueResult = CreateCachedBooleanTask(true);
40         /// <summary>The cached completed Task{bool} with a result of false.</summary>
41         internal static readonly Task<bool> CompletedTaskWithFalseResult = CreateCachedBooleanTask(false);
42         /// <summary>The cached completed TaskCompletionSource{VoidResult}.</summary>
43         internal static readonly TaskCompletionSource<VoidResult> CompletedVoidResultTaskCompletionSource = CreateCachedTaskCompletionSource<VoidResult>();
44
45         /// <summary>Asserts that a given synchronization object is either held or not held.</summary>
46         /// <param name="syncObj">The monitor to check.</param>
47         /// <param name="held">Whether we want to assert that it's currently held or not held.</param>
48         [Conditional("DEBUG")]
49         internal static void ContractAssertMonitorStatus(object syncObj, bool held)
50         {
51             Contract.Requires(syncObj != null, "The monitor object to check must be provided.");
52             Debug.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed.");
53         }
54
55         /// <summary>Keeping alive processing tasks: maximum number of processed messages.</summary>
56         internal const int KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD = 1;
57         /// <summary>Keeping alive processing tasks: do not attempt this many times.</summary>
58         internal const int KEEP_ALIVE_BAN_COUNT = 1000;
59
60         /// <summary>A predicate type for TryKeepAliveUntil.</summary>
61         /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
62         /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
63         /// <returns>The state of the predicate.</returns>
64         internal delegate bool KeepAlivePredicate<TStateIn, TStateOut>(TStateIn stateIn, out TStateOut stateOut);
65
66         /// <summary>Actively waits for a predicate to become true.</summary>
67         /// <param name="predicate">The predicate to become true.</param>
68         /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
69         /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
70         /// <returns>True if the predicate was evaluated and it returned true. False otherwise.</returns>
71         internal static bool TryKeepAliveUntil<TStateIn, TStateOut>(KeepAlivePredicate<TStateIn, TStateOut> predicate,
72                                                                     TStateIn stateIn, out TStateOut stateOut)
73         {
74             Contract.Requires(predicate != null, "Non-null predicate to execute is required.");
75             const int ITERATION_LIMIT = 16;
76
77             for (int c = ITERATION_LIMIT; c > 0; c--)
78             {
79                 if (!Thread.Yield())
80                 {
81                     // There was no other thread waiting. 
82                     // We may spend some more cycles to evaluate the predicate. 
83                     if (predicate(stateIn, out stateOut)) return true;
84                 }
85             }
86
87             stateOut = default(TStateOut);
88             return false;
89         }
90
91         /// <summary>Unwraps an instance T from object state that is a WeakReference to that instance.</summary>
92         /// <typeparam name="T">The type of the data to be unwrapped.</typeparam>
93         /// <param name="state">The weak reference.</param>
94         /// <returns>The T instance.</returns>
95         internal static T UnwrapWeakReference<T>(object state) where T : class
96         {
97             var wr = state as WeakReference<T>;
98             Debug.Assert(wr != null, "Expected a WeakReference<T> as the state argument");
99             T item;
100             return wr.TryGetTarget(out item) ? item : null;
101         }
102
103         /// <summary>Gets an ID for the dataflow block.</summary>
104         /// <param name="block">The dataflow block.</param>
105         /// <returns>An ID for the dataflow block.</returns>
106         internal static int GetBlockId(IDataflowBlock block)
107         {
108             Contract.Requires(block != null, "Block required to extract an Id.");
109             const int NOTASKID = 0; // tasks don't have 0 as ids
110             Task t = Common.GetPotentiallyNotSupportedCompletionTask(block);
111             return t != null ? t.Id : NOTASKID;
112         }
113
114         /// <summary>Gets the name for the specified block, suitable to be rendered in a debugger window.</summary>
115         /// <param name="block">The block for which a name is needed.</param>
116         /// <param name="options">
117         /// The options to use when rendering the name. If no options are provided, the block's name is used directly.
118         /// </param>
119         /// <returns>The name of the object.</returns>
120         /// <remarks>This is used from DebuggerDisplay attributes.</remarks>
121         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
122         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
123         internal static string GetNameForDebugger(
124             IDataflowBlock block, DataflowBlockOptions options = null)
125         {
126             Contract.Requires(block != null, "Should only be used with valid objects being displayed in the debugger.");
127             Contract.Requires(options == null || options.NameFormat != null, "If options are provided, NameFormat must be valid.");
128
129             if (block == null) return string.Empty;
130
131             string blockName = block.GetType().Name;
132             if (options == null) return blockName;
133
134             // {0} == block name
135             // {1} == block id
136             int blockId = GetBlockId(block);
137
138             // Since NameFormat is public, formatting may throw if the user has set
139             // a string that contains a reference to an argument higher than {1}.
140             // In the case of an exception, show the exception message.
141             try
142             {
143                 return string.Format(options.NameFormat, blockName, blockId);
144             }
145             catch (Exception exception)
146             {
147                 return exception.Message;
148             }
149         }
150
151         /// <summary>
152         /// Gets whether the exception represents a cooperative cancellation acknowledgment.
153         /// </summary>
154         /// <param name="exception">The exception to check.</param>
155         /// <returns>true if this exception represents a cooperative cancellation acknowledgment; otherwise, false.</returns>
156         internal static bool IsCooperativeCancellation(Exception exception)
157         {
158             Contract.Requires(exception != null, "An exception to check for cancellation must be provided.");
159             return exception is OperationCanceledException;
160             // Note that the behavior of this method does not exactly match that of Parallel.*, PLINQ, and Task.Factory.StartNew,
161             // in that it's more liberal and treats any OCE as acknowledgment of cancellation; in contrast, the other
162             // libraries only treat OCEs as such if they contain the same token that was provided by the user
163             // and if that token has cancellation requested.  Such logic could be achieved here with:
164             //   var oce = exception as OperationCanceledException;
165             //   return oce != null && 
166             //          oce.CancellationToken == dataflowBlockOptions.CancellationToken && 
167             //          oce.CancellationToken.IsCancellationRequested;
168             // However, that leads to a discrepancy with the async processing case of dataflow blocks,
169             // where tasks are returned to represent the message processing, potentially in the Canceled state, 
170             // and we simply ignore such tasks.  Further, for blocks like TransformBlock, it's useful to be able 
171             // to cancel an individual operation which must return a TOutput value, simply by throwing an OperationCanceledException.
172             // In such cases, you wouldn't want cancellation tied to the token, because you would only be able to
173             // cancel an individual message processing if the whole block was canceled.
174         }
175
176         /// <summary>Registers a block for cancellation by completing when cancellation is requested.</summary>
177         /// <param name="cancellationToken">The block's cancellation token.</param>
178         /// <param name="completionTask">The task that will complete when the block is completely done processing.</param>
179         /// <param name="completeAction">An action that will decline permanently on the state passed to it.</param>
180         /// <param name="completeState">The block on which to decline permanently.</param>
181         internal static void WireCancellationToComplete(
182             CancellationToken cancellationToken, Task completionTask, Action<object> completeAction, object completeState)
183         {
184             Contract.Requires(completionTask != null, "A task to wire up for completion is needed.");
185             Contract.Requires(completeAction != null, "An action to invoke upon cancellation is required.");
186
187             // If a cancellation request has already occurred, just invoke the declining action synchronously.
188             // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
189             if (cancellationToken.IsCancellationRequested)
190             {
191                 completeAction(completeState);
192             }
193             // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
194             // data, and we also want to dispose of that registration when we complete so that we don't
195             // leak into a long-living cancellation token.
196             else if (cancellationToken.CanBeCanceled)
197             {
198                 CancellationTokenRegistration reg = cancellationToken.Register(completeAction, completeState);
199                 completionTask.ContinueWith((completed, state) => ((CancellationTokenRegistration)state).Dispose(),
200                     reg, cancellationToken, Common.GetContinuationOptions(), TaskScheduler.Default);
201             }
202         }
203
204         /// <summary>Initializes the stack trace and watson bucket of an inactive exception.</summary>
205         /// <param name="exception">The exception to initialize.</param>
206         /// <returns>The initialized exception.</returns>
207         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
208         internal static Exception InitializeStackTrace(Exception exception)
209         {
210             Contract.Requires(exception != null && exception.StackTrace == null,
211                 "A valid but uninitialized exception should be provided.");
212             try { throw exception; }
213             catch { return exception; }
214         }
215
216         /// <summary>The name of the key in an Exception's Data collection used to store information on a dataflow message.</summary>
217         internal const string EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE = "DataflowMessageValue"; // should not be localized
218
219         /// <summary>Stores details on a dataflow message into an Exception's Data collection.</summary>
220         /// <typeparam name="T">Specifies the type of data stored in the message.</typeparam>
221         /// <param name="exc">The Exception whose Data collection should store message information.</param>
222         /// <param name="messageValue">The message information to be stored.</param>
223         /// <param name="targetInnerExceptions">Whether to store the data into the exception's inner exception(s) in addition to the exception itself.</param>
224         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
225         internal static void StoreDataflowMessageValueIntoExceptionData<T>(Exception exc, T messageValue, bool targetInnerExceptions = false)
226         {
227             Contract.Requires(exc != null, "The exception into which data should be stored must be provided.");
228
229             // Get the string value to store
230             string strValue = messageValue as string;
231             if (strValue == null && messageValue != null)
232             {
233                 try
234                 {
235                     strValue = messageValue.ToString();
236                 }
237                 catch { /* It's ok to eat all exceptions here.  If ToString throws, we'll just ignore it. */ }
238             }
239             if (strValue == null) return;
240
241             // Store the data into the exception itself
242             StoreStringIntoExceptionData(exc, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
243
244             // If we also want to target inner exceptions...
245             if (targetInnerExceptions)
246             {
247                 // If this is an aggregate, store into all inner exceptions.
248                 var aggregate = exc as AggregateException;
249                 if (aggregate != null)
250                 {
251                     foreach (Exception innerException in aggregate.InnerExceptions)
252                     {
253                         StoreStringIntoExceptionData(innerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
254                     }
255                 }
256                 // Otherwise, if there's an Exception.InnerException, store into that.
257                 else if (exc.InnerException != null)
258                 {
259                     StoreStringIntoExceptionData(exc.InnerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
260                 }
261             }
262         }
263
264         /// <summary>Stores the specified string value into the specified key slot of the specified exception's data dictionary.</summary>
265         /// <param name="exception">The exception into which the key/value should be stored.</param>
266         /// <param name="key">The key.</param>
267         /// <param name="value">The value to be serialized as a string and stored.</param>
268         /// <remarks>If the key is already present in the exception's data dictionary, the value is not overwritten.</remarks>
269         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
270         private static void StoreStringIntoExceptionData(Exception exception, string key, string value)
271         {
272             Contract.Requires(exception != null, "An exception is needed to store the data into.");
273             Contract.Requires(key != null, "A key into the exception's data collection is needed.");
274             Contract.Requires(value != null, "The value to store must be provided.");
275             try
276             {
277                 IDictionary data = exception.Data;
278                 if (data != null && !data.IsFixedSize && !data.IsReadOnly && data[key] == null)
279                 {
280                     data[key] = value;
281                 }
282             }
283             catch
284             {
285                 // It's ok to eat all exceptions here.  This could throw if an Exception type 
286                 // has overridden Data to behave differently than we expect.
287             }
288         }
289
290         /// <summary>Throws an exception asynchronously on the thread pool.</summary>
291         /// <param name="error">The exception to throw.</param>
292         /// <remarks>
293         /// This function is used when an exception needs to be propagated from a thread
294         /// other than the current context.  This could happen, for example, if the exception
295         /// should cause the standard CLR exception escalation behavior, but we're inside
296         /// of a task that will squirrel the exception away.
297         /// </remarks>
298         internal static void ThrowAsync(Exception error)
299         {
300             ExceptionDispatchInfo edi = ExceptionDispatchInfo.Capture(error);
301             ThreadPool.QueueUserWorkItem(state => { ((ExceptionDispatchInfo)state).Throw(); }, edi);
302         }
303
304         /// <summary>Adds the exception to the list, first initializing the list if the list is null.</summary>
305         /// <param name="list">The list to add the exception to, and initialize if null.</param>
306         /// <param name="exception">The exception to add or whose inner exception(s) should be added.</param>
307         /// <param name="unwrapInnerExceptions">Unwrap and add the inner exception(s) rather than the specified exception directly.</param>
308         /// <remarks>This method is not thread-safe, in that it manipulates <paramref name="list"/> without any synchronization.</remarks>
309         internal static void AddException(ref List<Exception> list, Exception exception, bool unwrapInnerExceptions = false)
310         {
311             Contract.Requires(exception != null, "An exception to add is required.");
312             Contract.Requires(!unwrapInnerExceptions || exception.InnerException != null,
313                 "If unwrapping is requested, an inner exception is required.");
314
315             // Make sure the list of exceptions is initialized (lazily).
316             if (list == null) list = new List<Exception>();
317
318             if (unwrapInnerExceptions)
319             {
320                 AggregateException aggregate = exception as AggregateException;
321                 if (aggregate != null)
322                 {
323                     list.AddRange(aggregate.InnerExceptions);
324                 }
325                 else
326                 {
327                     list.Add(exception.InnerException);
328                 }
329             }
330             else list.Add(exception);
331         }
332
333         /// <summary>Creates a task we can cache for the desired Boolean result.</summary>
334         /// <param name="value">The value of the Boolean.</param>
335         /// <returns>A task that may be cached.</returns>
336         private static Task<Boolean> CreateCachedBooleanTask(bool value)
337         {
338             // AsyncTaskMethodBuilder<Boolean> caches tasks that are non-disposable.
339             // By using these same tasks, we're a bit more robust against disposals,
340             // in that such a disposed task's ((IAsyncResult)task).AsyncWaitHandle
341             // is still valid.
342             var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<Boolean>.Create();
343             atmb.SetResult(value);
344             return atmb.Task; // must be accessed after SetResult to get the cached task
345         }
346
347         /// <summary>Creates a TaskCompletionSource{T} completed with a value of default(T) that we can cache.</summary>
348         /// <returns>Completed TaskCompletionSource{T} that may be cached.</returns>
349         private static TaskCompletionSource<T> CreateCachedTaskCompletionSource<T>()
350         {
351             var tcs = new TaskCompletionSource<T>();
352             tcs.SetResult(default(T));
353             return tcs;
354         }
355
356         /// <summary>Creates a task faulted with the specified exception.</summary>
357         /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
358         /// <param name="exception">The exception with which to complete the task.</param>
359         /// <returns>The faulted task.</returns>
360         internal static Task<TResult> CreateTaskFromException<TResult>(Exception exception)
361         {
362             var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<TResult>.Create();
363             atmb.SetException(exception);
364             return atmb.Task;
365         }
366
367         /// <summary>Creates a task canceled with the specified cancellation token.</summary>
368         /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
369         /// <returns>The canceled task.</returns>
370         [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
371         internal static Task<TResult> CreateTaskFromCancellation<TResult>(CancellationToken cancellationToken)
372         {
373             Contract.Requires(cancellationToken.IsCancellationRequested,
374                 "The task will only be immediately canceled if the token has cancellation requested already.");
375             var t = new Task<TResult>(CachedGenericDelegates<TResult>.DefaultTResultFunc, cancellationToken);
376             Debug.Assert(t.IsCanceled, "Task's constructor should cancel the task synchronously in the ctor.");
377             return t;
378         }
379
380         /// <summary>Gets the completion task of a block, and protects against common cases of the completion task not being implemented or supported.</summary>
381         /// <param name="block">The block.</param>
382         /// <returns>The completion task, or null if the block's completion task is not implemented or supported.</returns>
383         internal static Task GetPotentiallyNotSupportedCompletionTask(IDataflowBlock block)
384         {
385             Contract.Requires(block != null, "We need a block from which to retrieve a cancellation task.");
386             try
387             {
388                 return block.Completion;
389             }
390             catch (NotImplementedException) { }
391             catch (NotSupportedException) { }
392             return null;
393         }
394
395         /// <summary>
396         /// Creates an IDisposable that, when disposed, will acquire the outgoing lock while removing 
397         /// the target block from the target registry.
398         /// </summary>
399         /// <typeparam name="TOutput">Specifies the type of data in the block.</typeparam>
400         /// <param name="outgoingLock">The outgoing lock used to protect the target registry.</param>
401         /// <param name="targetRegistry">The target registry from which the target should be removed.</param>
402         /// <param name="targetBlock">The target to remove from the registry.</param>
403         /// <returns>An IDisposable that will unregister the target block from the registry while holding the outgoing lock.</returns>
404         internal static IDisposable CreateUnlinker<TOutput>(object outgoingLock, TargetRegistry<TOutput> targetRegistry, ITargetBlock<TOutput> targetBlock)
405         {
406             Contract.Requires(outgoingLock != null, "Monitor object needed to protect the operation.");
407             Contract.Requires(targetRegistry != null, "Registry from which to remove is required.");
408             Contract.Requires(targetBlock != null, "Target block to unlink is required.");
409             return Disposables.Create(CachedGenericDelegates<TOutput>.CreateUnlinkerShimAction,
410                 outgoingLock, targetRegistry, targetBlock);
411         }
412
413         /// <summary>An infinite TimeSpan.</summary>
414         internal static readonly TimeSpan InfiniteTimeSpan = Timeout.InfiniteTimeSpan;
415
416         /// <summary>Validates that a timeout either is -1 or is non-negative and within the range of an Int32.</summary>
417         /// <param name="timeout">The timeout to validate.</param>
418         /// <returns>true if the timeout is valid; otherwise, false.</returns>
419         internal static bool IsValidTimeout(TimeSpan timeout)
420         {
421             long millisecondsTimeout = (long)timeout.TotalMilliseconds;
422             return millisecondsTimeout >= Timeout.Infinite && millisecondsTimeout <= Int32.MaxValue;
423         }
424
425         /// <summary>Gets the options to use for continuation tasks.</summary>
426         /// <param name="toInclude">Any options to include in the result.</param>
427         /// <returns>The options to use.</returns>
428         internal static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude = TaskContinuationOptions.None)
429         {
430             return toInclude | TaskContinuationOptions.DenyChildAttach;
431         }
432
433         /// <summary>Gets the options to use for tasks.</summary>
434         /// <param name="isReplacementReplica">If this task is being created to replace another.</param>
435         /// <remarks>
436         /// These options should be used for all tasks that have the potential to run user code or
437         /// that are repeatedly spawned and thus need a modicum of fair treatment.
438         /// </remarks>
439         /// <returns>The options to use.</returns>
440         internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
441         {
442             TaskCreationOptions options = TaskCreationOptions.DenyChildAttach;
443             if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
444             return options;
445         }
446
447         /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
448         /// <param name="task">Task to be started.</param>
449         /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
450         /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
451         internal static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
452         {
453             Contract.Requires(task != null, "Task to start is required.");
454             Contract.Requires(scheduler != null, "Scheduler on which to start the task is required.");
455
456             if (scheduler == TaskScheduler.Default)
457             {
458                 task.Start(scheduler);
459                 return null; // We don't need to worry about scheduler exceptions from the default scheduler.
460             }
461             // Slow path with try/catch separated out so that StartTaskSafe may be inlined in the common case.
462             else return StartTaskSafeCore(task, scheduler);
463         }
464
465         /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
466         /// <param name="task">Task to be started.</param>
467         /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
468         /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
469         [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")]
470         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
471         private static Exception StartTaskSafeCore(Task task, TaskScheduler scheduler)
472         {
473             Contract.Requires(task != null, "Task to start is needed.");
474             Contract.Requires(scheduler != null, "Scheduler on which to start the task is required.");
475
476             Exception schedulingException = null;
477
478             try
479             {
480                 task.Start(scheduler);
481             }
482             catch (Exception caughtException)
483             {
484                 // Verify TPL has faulted the task
485                 Debug.Assert(task.IsFaulted, "The task should have been faulted if it failed to start.");
486
487                 // Observe the task's exception
488                 AggregateException ignoredTaskException = task.Exception;
489
490                 schedulingException = caughtException;
491             }
492
493             return schedulingException;
494         }
495
496         /// <summary>Pops and explicitly releases postponed messages after the block is done with processing.</summary>
497         /// <remarks>No locks should be held at this time. Unfortunately we cannot assert that.</remarks>
498         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
499         internal static void ReleaseAllPostponedMessages<T>(ITargetBlock<T> target,
500                                     QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponedMessages,
501                                     ref List<Exception> exceptions)
502         {
503             Contract.Requires(target != null, "There must be a subject target.");
504             Contract.Requires(postponedMessages != null, "The stacked map of postponed messages must exist.");
505
506             // Note that we don't synchronize on lockObject for postponedMessages here, 
507             // because no one should be adding to it at this time.  We do a bit of 
508             // checking just for sanity's sake.
509             int initialCount = postponedMessages.Count;
510             int processedCount = 0;
511
512             KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
513             while (postponedMessages.TryPop(out sourceAndMessage))
514             {
515                 // Loop through all postponed messages declining each messages.
516                 // The only way we have to do this is by reserving and then immediately releasing each message.
517                 // This is important for sources like SendAsyncSource, which keep state around until
518                 // they get a response to a postponed message.
519                 try
520                 {
521                     Debug.Assert(sourceAndMessage.Key != null, "Postponed messages must have an associated source.");
522                     if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, target))
523                     {
524                         sourceAndMessage.Key.ReleaseReservation(sourceAndMessage.Value, target);
525                     }
526                 }
527                 catch (Exception exc)
528                 {
529                     Common.AddException(ref exceptions, exc);
530                 }
531
532                 processedCount++;
533             }
534
535             Debug.Assert(processedCount == initialCount,
536                 "We should have processed the exact number of elements that were initially there.");
537         }
538
539         /// <summary>Cache ThrowAsync to avoid allocations when it is passed into PropagateCompletionXxx.</summary>
540         internal static readonly Action<Exception> AsyncExceptionHandler = ThrowAsync;
541
542         /// <summary>
543         /// Propagates completion of sourceCompletionTask to target synchronously.
544         /// </summary>
545         /// <param name="sourceCompletionTask">The task whose completion is to be propagated. It must be completed.</param>
546         /// <param name="target">The block where completion is propagated.</param>
547         /// <param name="exceptionHandler">Handler for exceptions from the target. May be null which would propagate the exception to the caller.</param>
548         internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target, Action<Exception> exceptionHandler)
549         {
550             Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
551             Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
552             Debug.Assert(sourceCompletionTask.IsCompleted, "sourceCompletionTask must be completed in order to propagate its completion.");
553
554             AggregateException exception = sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;
555
556             try
557             {
558                 if (exception != null) target.Fault(exception);
559                 else target.Complete();
560             }
561             catch (Exception exc)
562             {
563                 if (exceptionHandler != null) exceptionHandler(exc);
564                 else throw;
565             }
566         }
567
568         /// <summary>
569         /// Creates a continuation off sourceCompletionTask to complete target. See PropagateCompletion.
570         /// </summary>
571         private static void PropagateCompletionAsContinuation(Task sourceCompletionTask, IDataflowBlock target)
572         {
573             Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
574             Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
575             sourceCompletionTask.ContinueWith((task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state, AsyncExceptionHandler),
576                 target, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
577         }
578
579         /// <summary>
580         /// Propagates completion of sourceCompletionTask to target based on sourceCompletionTask's current state. See PropagateCompletion.
581         /// </summary>
582         internal static void PropagateCompletionOnceCompleted(Task sourceCompletionTask, IDataflowBlock target)
583         {
584             Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
585             Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
586
587             // If sourceCompletionTask is completed, propagate completion synchronously.
588             // Otherwise hook up a continuation.
589             if (sourceCompletionTask.IsCompleted) PropagateCompletion(sourceCompletionTask, target, exceptionHandler: null);
590             else PropagateCompletionAsContinuation(sourceCompletionTask, target);
591         }
592
593         /// <summary>Static class used to cache generic delegates the C# compiler doesn't cache by default.</summary>
594         /// <remarks>Without this, we end up allocating the generic delegate each time the operation is used.</remarks>
595         static class CachedGenericDelegates<T>
596         {
597             /// <summary>A function that returns the default value of T.</summary>
598             internal readonly static Func<T> DefaultTResultFunc = () => default(T);
599             /// <summary>
600             /// A function to use as the body of ActionOnDispose in CreateUnlinkerShim.
601             /// Passed a tuple of the sync obj, the target registry, and the target block as the state parameter.
602             /// </summary>
603             internal readonly static Action<object, TargetRegistry<T>, ITargetBlock<T>> CreateUnlinkerShimAction =
604                 (syncObj, registry, target) =>
605             {
606                 lock (syncObj) registry.Remove(target);
607             };
608         }
609     }
610
611     /// <summary>State used only when bounding.</summary>
612     [DebuggerDisplay("BoundedCapacity={BoundedCapacity}}")]
613     internal class BoundingState
614     {
615         /// <summary>The maximum number of messages allowed to be buffered.</summary>
616         internal readonly int BoundedCapacity;
617         /// <summary>The number of messages currently stored.</summary>
618         /// <remarks>
619         /// This value may temporarily be higher than the actual number stored.  
620         /// That's ok, we just can't accept any new messages if CurrentCount >= BoundedCapacity.
621         /// Worst case is that we may temporarily have fewer items in the block than our maximum allows,
622         /// but we'll never have more.
623         /// </remarks>
624         internal int CurrentCount;
625
626         /// <summary>Initializes the BoundingState.</summary>
627         /// <param name="boundedCapacity">The positive bounded capacity.</param>
628         internal BoundingState(int boundedCapacity)
629         {
630             Contract.Requires(boundedCapacity > 0, "Bounded is only supported with positive values.");
631             BoundedCapacity = boundedCapacity;
632         }
633
634         /// <summary>Gets whether there's room available to add another message.</summary>
635         internal bool CountIsLessThanBound { get { return CurrentCount < BoundedCapacity; } }
636     }
637
638     /// <summary>Stated used only when bounding and when postponed messages are stored.</summary>
639     /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
640     [DebuggerDisplay("BoundedCapacity={BoundedCapacity}, PostponedMessages={PostponedMessagesCountForDebugger}")]
641     internal class BoundingStateWithPostponed<TInput> : BoundingState
642     {
643         /// <summary>Queue of postponed messages.</summary>
644         internal readonly QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages =
645             new QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>();
646         /// <summary>
647         /// The number of transfers from the postponement queue to the input queue currently being processed.
648         /// </summary>
649         /// <remarks>
650         /// Blocks that use TargetCore need to transfer messages from the postponed queue to the input messages
651         /// queue.  While doing that, new incoming messages may arrive, and if they view the postponed queue
652         /// as being empty (after the block has removed the last postponed message and is consuming it, before
653         /// storing it into the input queue), they might go directly into the input queue... that will then mess
654         /// up the ordering between those postponed messages and the newly incoming messages.  To address that,
655         /// OutstandingTransfers is used to track the number of transfers currently in progress.  Incoming
656         /// messages must be postponed not only if there are already any postponed messages, but also if
657         /// there are any transfers in progress (i.e. this value is > 0).  It's an integer because the DOP could
658         /// be greater than 1, and thus we need to ref count multiple transfers that might be in progress.
659         /// </remarks>
660         internal int OutstandingTransfers;
661
662         /// <summary>Initializes the BoundingState.</summary>
663         /// <param name="boundedCapacity">The positive bounded capacity.</param>
664         internal BoundingStateWithPostponed(int boundedCapacity) : base(boundedCapacity)
665         {
666         }
667
668         /// <summary>Gets the number of postponed messages for the debugger.</summary>
669         private int PostponedMessagesCountForDebugger { get { return PostponedMessages.Count; } }
670     }
671
672     /// <summary>Stated used only when bounding and when postponed messages and a task are stored.</summary>
673     /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
674     internal class BoundingStateWithPostponedAndTask<TInput> : BoundingStateWithPostponed<TInput>
675     {
676         /// <summary>The task used to process messages.</summary>
677         internal Task TaskForInputProcessing;
678
679         /// <summary>Initializes the BoundingState.</summary>
680         /// <param name="boundedCapacity">The positive bounded capacity.</param>
681         internal BoundingStateWithPostponedAndTask(int boundedCapacity) : base(boundedCapacity)
682         {
683         }
684     }
685
686     /// <summary>
687     /// Type used with TaskCompletionSource(Of TResult) as the TResult
688     /// to ensure that the resulting task can't be upcast to something
689     /// that in the future could lead to compat problems.
690     /// </summary>
691     [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
692     [DebuggerNonUserCode]
693     internal struct VoidResult { }
694 }