1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // Helper routines for the rest of the TPL Dataflow implementation.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Security;
18 using System.Collections;
19 using System.Runtime.ExceptionServices;
20 using System.Threading.Tasks.Dataflow.Internal.Threading;
22 namespace System.Threading.Tasks.Dataflow.Internal
24 /// <summary>Internal helper utilities.</summary>
25 internal static class Common
28 /// An invalid ID to assign for reordering purposes. This value is chosen to be the last of the 64-bit integers that
29 /// could ever be assigned as a reordering ID.
31 internal const long INVALID_REORDERING_ID = -1;
32 /// <summary>A well-known message ID for code that will send exactly one message or
33 /// where the exact message ID is not important.</summary>
34 internal const int SINGLE_MESSAGE_ID = 1;
35 /// <summary>A perf optimization for caching a well-known message header instead of
36 /// constructing one every time it is needed.</summary>
37 internal static readonly DataflowMessageHeader SingleMessageHeader = new DataflowMessageHeader(SINGLE_MESSAGE_ID);
38 /// <summary>The cached completed Task{bool} with a result of true.</summary>
39 internal static readonly Task<bool> CompletedTaskWithTrueResult = CreateCachedBooleanTask(true);
40 /// <summary>The cached completed Task{bool} with a result of false.</summary>
41 internal static readonly Task<bool> CompletedTaskWithFalseResult = CreateCachedBooleanTask(false);
42 /// <summary>The cached completed TaskCompletionSource{VoidResult}.</summary>
43 internal static readonly TaskCompletionSource<VoidResult> CompletedVoidResultTaskCompletionSource = CreateCachedTaskCompletionSource<VoidResult>();
45 /// <summary>Asserts that a given synchronization object is either held or not held.</summary>
46 /// <param name="syncObj">The monitor to check.</param>
47 /// <param name="held">Whether we want to assert that it's currently held or not held.</param>
48 [Conditional("DEBUG")]
49 internal static void ContractAssertMonitorStatus(object syncObj, bool held)
51 Contract.Requires(syncObj != null, "The monitor object to check must be provided.");
52 Debug.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed.");
55 /// <summary>Keeping alive processing tasks: maximum number of processed messages.</summary>
56 internal const int KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD = 1;
57 /// <summary>Keeping alive processing tasks: do not attempt this many times.</summary>
58 internal const int KEEP_ALIVE_BAN_COUNT = 1000;
60 /// <summary>A predicate type for TryKeepAliveUntil.</summary>
61 /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
62 /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
63 /// <returns>The state of the predicate.</returns>
64 internal delegate bool KeepAlivePredicate<TStateIn, TStateOut>(TStateIn stateIn, out TStateOut stateOut);
66 /// <summary>Actively waits for a predicate to become true.</summary>
67 /// <param name="predicate">The predicate to become true.</param>
68 /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
69 /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
70 /// <returns>True if the predicate was evaluated and it returned true. False otherwise.</returns>
71 internal static bool TryKeepAliveUntil<TStateIn, TStateOut>(KeepAlivePredicate<TStateIn, TStateOut> predicate,
72 TStateIn stateIn, out TStateOut stateOut)
74 Contract.Requires(predicate != null, "Non-null predicate to execute is required.");
75 const int ITERATION_LIMIT = 16;
77 for (int c = ITERATION_LIMIT; c > 0; c--)
81 // There was no other thread waiting.
82 // We may spend some more cycles to evaluate the predicate.
83 if (predicate(stateIn, out stateOut)) return true;
87 stateOut = default(TStateOut);
91 /// <summary>Unwraps an instance T from object state that is a WeakReference to that instance.</summary>
92 /// <typeparam name="T">The type of the data to be unwrapped.</typeparam>
93 /// <param name="state">The weak reference.</param>
94 /// <returns>The T instance.</returns>
95 internal static T UnwrapWeakReference<T>(object state) where T : class
97 var wr = state as WeakReference<T>;
98 Debug.Assert(wr != null, "Expected a WeakReference<T> as the state argument");
100 return wr.TryGetTarget(out item) ? item : null;
103 /// <summary>Gets an ID for the dataflow block.</summary>
104 /// <param name="block">The dataflow block.</param>
105 /// <returns>An ID for the dataflow block.</returns>
106 internal static int GetBlockId(IDataflowBlock block)
108 Contract.Requires(block != null, "Block required to extract an Id.");
109 const int NOTASKID = 0; // tasks don't have 0 as ids
110 Task t = Common.GetPotentiallyNotSupportedCompletionTask(block);
111 return t != null ? t.Id : NOTASKID;
114 /// <summary>Gets the name for the specified block, suitable to be rendered in a debugger window.</summary>
115 /// <param name="block">The block for which a name is needed.</param>
116 /// <param name="options">
117 /// The options to use when rendering the name. If no options are provided, the block's name is used directly.
119 /// <returns>The name of the object.</returns>
120 /// <remarks>This is used from DebuggerDisplay attributes.</remarks>
121 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
122 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
123 internal static string GetNameForDebugger(
124 IDataflowBlock block, DataflowBlockOptions options = null)
126 Contract.Requires(block != null, "Should only be used with valid objects being displayed in the debugger.");
127 Contract.Requires(options == null || options.NameFormat != null, "If options are provided, NameFormat must be valid.");
129 if (block == null) return string.Empty;
131 string blockName = block.GetType().Name;
132 if (options == null) return blockName;
136 int blockId = GetBlockId(block);
138 // Since NameFormat is public, formatting may throw if the user has set
139 // a string that contains a reference to an argument higher than {1}.
140 // In the case of an exception, show the exception message.
143 return string.Format(options.NameFormat, blockName, blockId);
145 catch (Exception exception)
147 return exception.Message;
152 /// Gets whether the exception represents a cooperative cancellation acknowledgment.
154 /// <param name="exception">The exception to check.</param>
155 /// <returns>true if this exception represents a cooperative cancellation acknowledgment; otherwise, false.</returns>
156 internal static bool IsCooperativeCancellation(Exception exception)
158 Contract.Requires(exception != null, "An exception to check for cancellation must be provided.");
159 return exception is OperationCanceledException;
160 // Note that the behavior of this method does not exactly match that of Parallel.*, PLINQ, and Task.Factory.StartNew,
161 // in that it's more liberal and treats any OCE as acknowledgment of cancellation; in contrast, the other
162 // libraries only treat OCEs as such if they contain the same token that was provided by the user
163 // and if that token has cancellation requested. Such logic could be achieved here with:
164 // var oce = exception as OperationCanceledException;
165 // return oce != null &&
166 // oce.CancellationToken == dataflowBlockOptions.CancellationToken &&
167 // oce.CancellationToken.IsCancellationRequested;
168 // However, that leads to a discrepancy with the async processing case of dataflow blocks,
169 // where tasks are returned to represent the message processing, potentially in the Canceled state,
170 // and we simply ignore such tasks. Further, for blocks like TransformBlock, it's useful to be able
171 // to cancel an individual operation which must return a TOutput value, simply by throwing an OperationCanceledException.
172 // In such cases, you wouldn't want cancellation tied to the token, because you would only be able to
173 // cancel an individual message processing if the whole block was canceled.
176 /// <summary>Registers a block for cancellation by completing when cancellation is requested.</summary>
177 /// <param name="cancellationToken">The block's cancellation token.</param>
178 /// <param name="completionTask">The task that will complete when the block is completely done processing.</param>
179 /// <param name="completeAction">An action that will decline permanently on the state passed to it.</param>
180 /// <param name="completeState">The block on which to decline permanently.</param>
181 internal static void WireCancellationToComplete(
182 CancellationToken cancellationToken, Task completionTask, Action<object> completeAction, object completeState)
184 Contract.Requires(completionTask != null, "A task to wire up for completion is needed.");
185 Contract.Requires(completeAction != null, "An action to invoke upon cancellation is required.");
187 // If a cancellation request has already occurred, just invoke the declining action synchronously.
188 // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
189 if (cancellationToken.IsCancellationRequested)
191 completeAction(completeState);
193 // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
194 // data, and we also want to dispose of that registration when we complete so that we don't
195 // leak into a long-living cancellation token.
196 else if (cancellationToken.CanBeCanceled)
198 CancellationTokenRegistration reg = cancellationToken.Register(completeAction, completeState);
199 completionTask.ContinueWith((completed, state) => ((CancellationTokenRegistration)state).Dispose(),
200 reg, cancellationToken, Common.GetContinuationOptions(), TaskScheduler.Default);
204 /// <summary>Initializes the stack trace and watson bucket of an inactive exception.</summary>
205 /// <param name="exception">The exception to initialize.</param>
206 /// <returns>The initialized exception.</returns>
207 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
208 internal static Exception InitializeStackTrace(Exception exception)
210 Contract.Requires(exception != null && exception.StackTrace == null,
211 "A valid but uninitialized exception should be provided.");
212 try { throw exception; }
213 catch { return exception; }
216 /// <summary>The name of the key in an Exception's Data collection used to store information on a dataflow message.</summary>
217 internal const string EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE = "DataflowMessageValue"; // should not be localized
219 /// <summary>Stores details on a dataflow message into an Exception's Data collection.</summary>
220 /// <typeparam name="T">Specifies the type of data stored in the message.</typeparam>
221 /// <param name="exc">The Exception whose Data collection should store message information.</param>
222 /// <param name="messageValue">The message information to be stored.</param>
223 /// <param name="targetInnerExceptions">Whether to store the data into the exception's inner exception(s) in addition to the exception itself.</param>
224 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
225 internal static void StoreDataflowMessageValueIntoExceptionData<T>(Exception exc, T messageValue, bool targetInnerExceptions = false)
227 Contract.Requires(exc != null, "The exception into which data should be stored must be provided.");
229 // Get the string value to store
230 string strValue = messageValue as string;
231 if (strValue == null && messageValue != null)
235 strValue = messageValue.ToString();
237 catch { /* It's ok to eat all exceptions here. If ToString throws, we'll just ignore it. */ }
239 if (strValue == null) return;
241 // Store the data into the exception itself
242 StoreStringIntoExceptionData(exc, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
244 // If we also want to target inner exceptions...
245 if (targetInnerExceptions)
247 // If this is an aggregate, store into all inner exceptions.
248 var aggregate = exc as AggregateException;
249 if (aggregate != null)
251 foreach (Exception innerException in aggregate.InnerExceptions)
253 StoreStringIntoExceptionData(innerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
256 // Otherwise, if there's an Exception.InnerException, store into that.
257 else if (exc.InnerException != null)
259 StoreStringIntoExceptionData(exc.InnerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
264 /// <summary>Stores the specified string value into the specified key slot of the specified exception's data dictionary.</summary>
265 /// <param name="exception">The exception into which the key/value should be stored.</param>
266 /// <param name="key">The key.</param>
267 /// <param name="value">The value to be serialized as a string and stored.</param>
268 /// <remarks>If the key is already present in the exception's data dictionary, the value is not overwritten.</remarks>
269 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
270 private static void StoreStringIntoExceptionData(Exception exception, string key, string value)
272 Contract.Requires(exception != null, "An exception is needed to store the data into.");
273 Contract.Requires(key != null, "A key into the exception's data collection is needed.");
274 Contract.Requires(value != null, "The value to store must be provided.");
277 IDictionary data = exception.Data;
278 if (data != null && !data.IsFixedSize && !data.IsReadOnly && data[key] == null)
285 // It's ok to eat all exceptions here. This could throw if an Exception type
286 // has overridden Data to behave differently than we expect.
290 /// <summary>Throws an exception asynchronously on the thread pool.</summary>
291 /// <param name="error">The exception to throw.</param>
293 /// This function is used when an exception needs to be propagated from a thread
294 /// other than the current context. This could happen, for example, if the exception
295 /// should cause the standard CLR exception escalation behavior, but we're inside
296 /// of a task that will squirrel the exception away.
298 internal static void ThrowAsync(Exception error)
300 ExceptionDispatchInfo edi = ExceptionDispatchInfo.Capture(error);
301 ThreadPool.QueueUserWorkItem(state => { ((ExceptionDispatchInfo)state).Throw(); }, edi);
304 /// <summary>Adds the exception to the list, first initializing the list if the list is null.</summary>
305 /// <param name="list">The list to add the exception to, and initialize if null.</param>
306 /// <param name="exception">The exception to add or whose inner exception(s) should be added.</param>
307 /// <param name="unwrapInnerExceptions">Unwrap and add the inner exception(s) rather than the specified exception directly.</param>
308 /// <remarks>This method is not thread-safe, in that it manipulates <paramref name="list"/> without any synchronization.</remarks>
309 internal static void AddException(ref List<Exception> list, Exception exception, bool unwrapInnerExceptions = false)
311 Contract.Requires(exception != null, "An exception to add is required.");
312 Contract.Requires(!unwrapInnerExceptions || exception.InnerException != null,
313 "If unwrapping is requested, an inner exception is required.");
315 // Make sure the list of exceptions is initialized (lazily).
316 if (list == null) list = new List<Exception>();
318 if (unwrapInnerExceptions)
320 AggregateException aggregate = exception as AggregateException;
321 if (aggregate != null)
323 list.AddRange(aggregate.InnerExceptions);
327 list.Add(exception.InnerException);
330 else list.Add(exception);
333 /// <summary>Creates a task we can cache for the desired Boolean result.</summary>
334 /// <param name="value">The value of the Boolean.</param>
335 /// <returns>A task that may be cached.</returns>
336 private static Task<Boolean> CreateCachedBooleanTask(bool value)
338 // AsyncTaskMethodBuilder<Boolean> caches tasks that are non-disposable.
339 // By using these same tasks, we're a bit more robust against disposals,
340 // in that such a disposed task's ((IAsyncResult)task).AsyncWaitHandle
342 var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<Boolean>.Create();
343 atmb.SetResult(value);
344 return atmb.Task; // must be accessed after SetResult to get the cached task
347 /// <summary>Creates a TaskCompletionSource{T} completed with a value of default(T) that we can cache.</summary>
348 /// <returns>Completed TaskCompletionSource{T} that may be cached.</returns>
349 private static TaskCompletionSource<T> CreateCachedTaskCompletionSource<T>()
351 var tcs = new TaskCompletionSource<T>();
352 tcs.SetResult(default(T));
356 /// <summary>Creates a task faulted with the specified exception.</summary>
357 /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
358 /// <param name="exception">The exception with which to complete the task.</param>
359 /// <returns>The faulted task.</returns>
360 internal static Task<TResult> CreateTaskFromException<TResult>(Exception exception)
362 var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<TResult>.Create();
363 atmb.SetException(exception);
367 /// <summary>Creates a task canceled with the specified cancellation token.</summary>
368 /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
369 /// <returns>The canceled task.</returns>
370 [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
371 internal static Task<TResult> CreateTaskFromCancellation<TResult>(CancellationToken cancellationToken)
373 Contract.Requires(cancellationToken.IsCancellationRequested,
374 "The task will only be immediately canceled if the token has cancellation requested already.");
375 var t = new Task<TResult>(CachedGenericDelegates<TResult>.DefaultTResultFunc, cancellationToken);
376 Debug.Assert(t.IsCanceled, "Task's constructor should cancel the task synchronously in the ctor.");
380 /// <summary>Gets the completion task of a block, and protects against common cases of the completion task not being implemented or supported.</summary>
381 /// <param name="block">The block.</param>
382 /// <returns>The completion task, or null if the block's completion task is not implemented or supported.</returns>
383 internal static Task GetPotentiallyNotSupportedCompletionTask(IDataflowBlock block)
385 Contract.Requires(block != null, "We need a block from which to retrieve a cancellation task.");
388 return block.Completion;
390 catch (NotImplementedException) { }
391 catch (NotSupportedException) { }
396 /// Creates an IDisposable that, when disposed, will acquire the outgoing lock while removing
397 /// the target block from the target registry.
399 /// <typeparam name="TOutput">Specifies the type of data in the block.</typeparam>
400 /// <param name="outgoingLock">The outgoing lock used to protect the target registry.</param>
401 /// <param name="targetRegistry">The target registry from which the target should be removed.</param>
402 /// <param name="targetBlock">The target to remove from the registry.</param>
403 /// <returns>An IDisposable that will unregister the target block from the registry while holding the outgoing lock.</returns>
404 internal static IDisposable CreateUnlinker<TOutput>(object outgoingLock, TargetRegistry<TOutput> targetRegistry, ITargetBlock<TOutput> targetBlock)
406 Contract.Requires(outgoingLock != null, "Monitor object needed to protect the operation.");
407 Contract.Requires(targetRegistry != null, "Registry from which to remove is required.");
408 Contract.Requires(targetBlock != null, "Target block to unlink is required.");
409 return Disposables.Create(CachedGenericDelegates<TOutput>.CreateUnlinkerShimAction,
410 outgoingLock, targetRegistry, targetBlock);
413 /// <summary>An infinite TimeSpan.</summary>
414 internal static readonly TimeSpan InfiniteTimeSpan = Timeout.InfiniteTimeSpan;
416 /// <summary>Validates that a timeout either is -1 or is non-negative and within the range of an Int32.</summary>
417 /// <param name="timeout">The timeout to validate.</param>
418 /// <returns>true if the timeout is valid; otherwise, false.</returns>
419 internal static bool IsValidTimeout(TimeSpan timeout)
421 long millisecondsTimeout = (long)timeout.TotalMilliseconds;
422 return millisecondsTimeout >= Timeout.Infinite && millisecondsTimeout <= Int32.MaxValue;
425 /// <summary>Gets the options to use for continuation tasks.</summary>
426 /// <param name="toInclude">Any options to include in the result.</param>
427 /// <returns>The options to use.</returns>
428 internal static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude = TaskContinuationOptions.None)
430 return toInclude | TaskContinuationOptions.DenyChildAttach;
433 /// <summary>Gets the options to use for tasks.</summary>
434 /// <param name="isReplacementReplica">If this task is being created to replace another.</param>
436 /// These options should be used for all tasks that have the potential to run user code or
437 /// that are repeatedly spawned and thus need a modicum of fair treatment.
439 /// <returns>The options to use.</returns>
440 internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
442 TaskCreationOptions options = TaskCreationOptions.DenyChildAttach;
443 if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
447 /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
448 /// <param name="task">Task to be started.</param>
449 /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
450 /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
451 internal static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
453 Contract.Requires(task != null, "Task to start is required.");
454 Contract.Requires(scheduler != null, "Scheduler on which to start the task is required.");
456 if (scheduler == TaskScheduler.Default)
458 task.Start(scheduler);
459 return null; // We don't need to worry about scheduler exceptions from the default scheduler.
461 // Slow path with try/catch separated out so that StartTaskSafe may be inlined in the common case.
462 else return StartTaskSafeCore(task, scheduler);
465 /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
466 /// <param name="task">Task to be started.</param>
467 /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
468 /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
469 [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")]
470 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
471 private static Exception StartTaskSafeCore(Task task, TaskScheduler scheduler)
473 Contract.Requires(task != null, "Task to start is needed.");
474 Contract.Requires(scheduler != null, "Scheduler on which to start the task is required.");
476 Exception schedulingException = null;
480 task.Start(scheduler);
482 catch (Exception caughtException)
484 // Verify TPL has faulted the task
485 Debug.Assert(task.IsFaulted, "The task should have been faulted if it failed to start.");
487 // Observe the task's exception
488 AggregateException ignoredTaskException = task.Exception;
490 schedulingException = caughtException;
493 return schedulingException;
496 /// <summary>Pops and explicitly releases postponed messages after the block is done with processing.</summary>
497 /// <remarks>No locks should be held at this time. Unfortunately we cannot assert that.</remarks>
498 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
499 internal static void ReleaseAllPostponedMessages<T>(ITargetBlock<T> target,
500 QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponedMessages,
501 ref List<Exception> exceptions)
503 Contract.Requires(target != null, "There must be a subject target.");
504 Contract.Requires(postponedMessages != null, "The stacked map of postponed messages must exist.");
506 // Note that we don't synchronize on lockObject for postponedMessages here,
507 // because no one should be adding to it at this time. We do a bit of
508 // checking just for sanity's sake.
509 int initialCount = postponedMessages.Count;
510 int processedCount = 0;
512 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
513 while (postponedMessages.TryPop(out sourceAndMessage))
515 // Loop through all postponed messages declining each messages.
516 // The only way we have to do this is by reserving and then immediately releasing each message.
517 // This is important for sources like SendAsyncSource, which keep state around until
518 // they get a response to a postponed message.
521 Debug.Assert(sourceAndMessage.Key != null, "Postponed messages must have an associated source.");
522 if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, target))
524 sourceAndMessage.Key.ReleaseReservation(sourceAndMessage.Value, target);
527 catch (Exception exc)
529 Common.AddException(ref exceptions, exc);
535 Debug.Assert(processedCount == initialCount,
536 "We should have processed the exact number of elements that were initially there.");
539 /// <summary>Cache ThrowAsync to avoid allocations when it is passed into PropagateCompletionXxx.</summary>
540 internal static readonly Action<Exception> AsyncExceptionHandler = ThrowAsync;
543 /// Propagates completion of sourceCompletionTask to target synchronously.
545 /// <param name="sourceCompletionTask">The task whose completion is to be propagated. It must be completed.</param>
546 /// <param name="target">The block where completion is propagated.</param>
547 /// <param name="exceptionHandler">Handler for exceptions from the target. May be null which would propagate the exception to the caller.</param>
548 internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target, Action<Exception> exceptionHandler)
550 Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
551 Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
552 Debug.Assert(sourceCompletionTask.IsCompleted, "sourceCompletionTask must be completed in order to propagate its completion.");
554 AggregateException exception = sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;
558 if (exception != null) target.Fault(exception);
559 else target.Complete();
561 catch (Exception exc)
563 if (exceptionHandler != null) exceptionHandler(exc);
569 /// Creates a continuation off sourceCompletionTask to complete target. See PropagateCompletion.
571 private static void PropagateCompletionAsContinuation(Task sourceCompletionTask, IDataflowBlock target)
573 Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
574 Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
575 sourceCompletionTask.ContinueWith((task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state, AsyncExceptionHandler),
576 target, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
580 /// Propagates completion of sourceCompletionTask to target based on sourceCompletionTask's current state. See PropagateCompletion.
582 internal static void PropagateCompletionOnceCompleted(Task sourceCompletionTask, IDataflowBlock target)
584 Contract.Requires(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
585 Contract.Requires(target != null, "The target where completion is to be propagated may not be null.");
587 // If sourceCompletionTask is completed, propagate completion synchronously.
588 // Otherwise hook up a continuation.
589 if (sourceCompletionTask.IsCompleted) PropagateCompletion(sourceCompletionTask, target, exceptionHandler: null);
590 else PropagateCompletionAsContinuation(sourceCompletionTask, target);
593 /// <summary>Static class used to cache generic delegates the C# compiler doesn't cache by default.</summary>
594 /// <remarks>Without this, we end up allocating the generic delegate each time the operation is used.</remarks>
595 static class CachedGenericDelegates<T>
597 /// <summary>A function that returns the default value of T.</summary>
598 internal readonly static Func<T> DefaultTResultFunc = () => default(T);
600 /// A function to use as the body of ActionOnDispose in CreateUnlinkerShim.
601 /// Passed a tuple of the sync obj, the target registry, and the target block as the state parameter.
603 internal readonly static Action<object, TargetRegistry<T>, ITargetBlock<T>> CreateUnlinkerShimAction =
604 (syncObj, registry, target) =>
606 lock (syncObj) registry.Remove(target);
611 /// <summary>State used only when bounding.</summary>
612 [DebuggerDisplay("BoundedCapacity={BoundedCapacity}}")]
613 internal class BoundingState
615 /// <summary>The maximum number of messages allowed to be buffered.</summary>
616 internal readonly int BoundedCapacity;
617 /// <summary>The number of messages currently stored.</summary>
619 /// This value may temporarily be higher than the actual number stored.
620 /// That's ok, we just can't accept any new messages if CurrentCount >= BoundedCapacity.
621 /// Worst case is that we may temporarily have fewer items in the block than our maximum allows,
622 /// but we'll never have more.
624 internal int CurrentCount;
626 /// <summary>Initializes the BoundingState.</summary>
627 /// <param name="boundedCapacity">The positive bounded capacity.</param>
628 internal BoundingState(int boundedCapacity)
630 Contract.Requires(boundedCapacity > 0, "Bounded is only supported with positive values.");
631 BoundedCapacity = boundedCapacity;
634 /// <summary>Gets whether there's room available to add another message.</summary>
635 internal bool CountIsLessThanBound { get { return CurrentCount < BoundedCapacity; } }
638 /// <summary>Stated used only when bounding and when postponed messages are stored.</summary>
639 /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
640 [DebuggerDisplay("BoundedCapacity={BoundedCapacity}, PostponedMessages={PostponedMessagesCountForDebugger}")]
641 internal class BoundingStateWithPostponed<TInput> : BoundingState
643 /// <summary>Queue of postponed messages.</summary>
644 internal readonly QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages =
645 new QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>();
647 /// The number of transfers from the postponement queue to the input queue currently being processed.
650 /// Blocks that use TargetCore need to transfer messages from the postponed queue to the input messages
651 /// queue. While doing that, new incoming messages may arrive, and if they view the postponed queue
652 /// as being empty (after the block has removed the last postponed message and is consuming it, before
653 /// storing it into the input queue), they might go directly into the input queue... that will then mess
654 /// up the ordering between those postponed messages and the newly incoming messages. To address that,
655 /// OutstandingTransfers is used to track the number of transfers currently in progress. Incoming
656 /// messages must be postponed not only if there are already any postponed messages, but also if
657 /// there are any transfers in progress (i.e. this value is > 0). It's an integer because the DOP could
658 /// be greater than 1, and thus we need to ref count multiple transfers that might be in progress.
660 internal int OutstandingTransfers;
662 /// <summary>Initializes the BoundingState.</summary>
663 /// <param name="boundedCapacity">The positive bounded capacity.</param>
664 internal BoundingStateWithPostponed(int boundedCapacity) : base(boundedCapacity)
668 /// <summary>Gets the number of postponed messages for the debugger.</summary>
669 private int PostponedMessagesCountForDebugger { get { return PostponedMessages.Count; } }
672 /// <summary>Stated used only when bounding and when postponed messages and a task are stored.</summary>
673 /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
674 internal class BoundingStateWithPostponedAndTask<TInput> : BoundingStateWithPostponed<TInput>
676 /// <summary>The task used to process messages.</summary>
677 internal Task TaskForInputProcessing;
679 /// <summary>Initializes the BoundingState.</summary>
680 /// <param name="boundedCapacity">The positive bounded capacity.</param>
681 internal BoundingStateWithPostponedAndTask(int boundedCapacity) : base(boundedCapacity)
687 /// Type used with TaskCompletionSource(Of TResult) as the TResult
688 /// to ensure that the resulting task can't be upcast to something
689 /// that in the future could lead to compat problems.
691 [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
692 [DebuggerNonUserCode]
693 internal struct VoidResult { }