1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // The core implementation of a standard ITargetBlock<TInput>.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Concurrent;
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
19 using System.Security;
21 namespace System.Threading.Tasks.Dataflow.Internal
23 // LOCK-LEVELING SCHEME
24 // --------------------
25 // TargetCore employs a single lock: IncomingLock. This lock must not be used when calling out to any targets,
26 // which TargetCore should not have, anyway. It also must not be held when calling back to any sources, except
27 // during calls to OfferMessage from that same source.
29 /// <summary>Options used to configure a target core.</summary>
31 internal enum TargetCoreOptions : byte
33 /// <summary>Synchronous completion, both a target and a source, etc.</summary>
35 /// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
36 UsesAsyncCompletion = 0x1,
38 /// Whether the block containing this target core is just a target or also has a source side.
39 /// If it's just a target, then this target core's completion represents the entire block's completion.
41 RepresentsBlockCompletion = 0x2
45 /// Provides a core implementation of <see cref="ITargetBlock{TInput}"/>.</summary>
46 /// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="TargetCore{TInput}"/>.</typeparam>
47 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
48 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
49 internal sealed class TargetCore<TInput>
51 // *** These fields are readonly and are initialized at AppDomain startup.
53 /// <summary>Caching the keep alive predicate.</summary>
54 private static readonly Common.KeepAlivePredicate<TargetCore<TInput>, KeyValuePair<TInput, long>> _keepAlivePredicate =
55 (TargetCore<TInput> thisTargetCore, out KeyValuePair<TInput, long> messageWithId) =>
56 thisTargetCore.TryGetNextAvailableOrPostponedMessage(out messageWithId);
58 // *** These fields are readonly and are initialized to new instances at construction.
60 /// <summary>A task representing the completion of the block.</summary>
61 private readonly TaskCompletionSource<VoidResult> _completionSource = new TaskCompletionSource<VoidResult>();
63 // *** These fields are readonly and are initialized by arguments to the constructor.
65 /// <summary>The target block using this helper.</summary>
66 private readonly ITargetBlock<TInput> _owningTarget;
67 /// <summary>The messages in this target.</summary>
68 /// <remarks>This field doubles as the IncomingLock.</remarks>
69 private readonly IProducerConsumerQueue<KeyValuePair<TInput, long>> _messages;
70 /// <summary>The options associated with this block.</summary>
71 private readonly ExecutionDataflowBlockOptions _dataflowBlockOptions;
72 /// <summary>An action to invoke for every accepted message.</summary>
73 private readonly Action<KeyValuePair<TInput, long>> _callAction;
74 /// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
75 private readonly TargetCoreOptions _targetCoreOptions;
76 /// <summary>Bounding state for when the block is executing in bounded mode.</summary>
77 private readonly BoundingStateWithPostponed<TInput> _boundingState;
78 /// <summary>The reordering buffer used by the owner. May be null.</summary>
79 private readonly IReorderingBuffer _reorderingBuffer;
81 /// <summary>Gets the object used as the incoming lock.</summary>
82 private object IncomingLock { get { return _messages; } }
84 // *** These fields are mutated during execution.
86 /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
87 private List<Exception> _exceptions;
88 /// <summary>Whether to stop accepting new messages.</summary>
89 private bool _decliningPermanently;
90 /// <summary>The number of operations (including service tasks) currently running asynchronously.</summary>
91 /// <remarks>Must always be accessed from inside a lock.</remarks>
92 private int _numberOfOutstandingOperations;
93 /// <summary>The number of service tasks in async mode currently running.</summary>
94 /// <remarks>Must always be accessed from inside a lock.</remarks>
95 private int _numberOfOutstandingServiceTasks;
96 /// <summary>The next available ID we can assign to a message about to be processed.</summary>
97 private PaddedInt64 _nextAvailableInputMessageId; // initialized to 0... very important for a reordering buffer
98 /// <summary>A task has reserved the right to run the completion routine.</summary>
99 private bool _completionReserved;
100 /// <summary>This counter is set by the processing loop to prevent itself from trying to keep alive.</summary>
101 private int _keepAliveBanCounter;
103 /// <summary>Initializes the target core.</summary>
104 /// <param name="owningTarget">The target using this helper.</param>
105 /// <param name="callAction">An action to invoke for all accepted items.</param>
106 /// <param name="reorderingBuffer">The reordering buffer used by the owner; may be null.</param>
107 /// <param name="dataflowBlockOptions">The options to use to configure this block. The target core assumes these options are immutable.</param>
108 /// <param name="targetCoreOptions">Options for how the target core should behave.</param>
110 ITargetBlock<TInput> owningTarget,
111 Action<KeyValuePair<TInput, long>> callAction,
112 IReorderingBuffer reorderingBuffer,
113 ExecutionDataflowBlockOptions dataflowBlockOptions,
114 TargetCoreOptions targetCoreOptions)
116 // Validate internal arguments
117 Contract.Requires(owningTarget != null, "Core must be associated with a target block.");
118 Contract.Requires(dataflowBlockOptions != null, "Options must be provided to configure the core.");
119 Contract.Requires(callAction != null, "Action to invoke for each item is required.");
121 // Store arguments and do additional initialization
122 _owningTarget = owningTarget;
123 _callAction = callAction;
124 _reorderingBuffer = reorderingBuffer;
125 _dataflowBlockOptions = dataflowBlockOptions;
126 _targetCoreOptions = targetCoreOptions;
127 _messages = (dataflowBlockOptions.MaxDegreeOfParallelism == 1) ?
128 (IProducerConsumerQueue<KeyValuePair<TInput, long>>)new SingleProducerSingleConsumerQueue<KeyValuePair<TInput, long>>() :
129 (IProducerConsumerQueue<KeyValuePair<TInput, long>>)new MultiProducerMultiConsumerQueue<KeyValuePair<TInput, long>>();
130 if (_dataflowBlockOptions.BoundedCapacity != System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded)
132 Debug.Assert(_dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor");
133 _boundingState = new BoundingStateWithPostponed<TInput>(_dataflowBlockOptions.BoundedCapacity);
137 /// <summary>Internal Complete entry point with extra parameters for different contexts.</summary>
138 /// <param name="exception">If not null, the block will be faulted.</param>
139 /// <param name="dropPendingMessages">If true, any unprocessed input messages will be dropped.</param>
140 /// <param name="storeExceptionEvenIfAlreadyCompleting">If true, an exception will be stored after _decliningPermanently has been set to true.</param>
141 /// <param name="unwrapInnerExceptions">If true, exception will be treated as an AggregateException.</param>
142 /// <param name="revertProcessingState">Indicates whether the processing state is dirty and has to be reverted.</param>
143 internal void Complete(Exception exception, bool dropPendingMessages, bool storeExceptionEvenIfAlreadyCompleting = false,
144 bool unwrapInnerExceptions = false, bool revertProcessingState = false)
146 Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
147 "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
148 Contract.EndContractBlock();
150 // Ensure that no new messages may be added
153 // Faulting from outside is allowed until we start declining permanently.
154 // Faulting from inside is allowed at any time.
155 if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting))
157 Debug.Assert(_numberOfOutstandingOperations > 0 || !storeExceptionEvenIfAlreadyCompleting,
158 "Calls with storeExceptionEvenIfAlreadyCompleting==true may only be coming from processing task.");
160 #pragma warning disable 0420
161 Common.AddException(ref _exceptions, exception, unwrapInnerExceptions);
164 // Clear the messages queue if requested
165 if (dropPendingMessages)
167 KeyValuePair<TInput, long> dummy;
168 while (_messages.TryDequeue(out dummy)) ;
171 // Revert the dirty processing state if requested
172 if (revertProcessingState)
174 Debug.Assert(_numberOfOutstandingOperations > 0 && (!UsesAsyncCompletion || _numberOfOutstandingServiceTasks > 0),
175 "The processing state must be dirty when revertProcessingState==true.");
176 _numberOfOutstandingOperations--;
177 if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks--;
180 // Trigger completion
181 _decliningPermanently = true;
182 CompleteBlockIfPossible();
186 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
187 internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
189 // Validate arguments
190 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
191 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
192 Contract.EndContractBlock();
196 // If we shouldn't be accepting more messages, don't.
197 if (_decliningPermanently)
199 CompleteBlockIfPossible();
200 return DataflowMessageStatus.DecliningPermanently;
203 // We can directly accept the message if:
204 // 1) we are not bounding, OR
205 // 2) we are bounding AND there is room available AND there are no postponed messages AND no messages are currently being transfered to the input queue.
206 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
207 // (Unlike all other blocks, TargetCore can accept messages while processing, because
208 // input message IDs are properly assigned and the correct order is preserved.)
209 if (_boundingState == null ||
210 (_boundingState.OutstandingTransfers == 0 && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0))
212 // Consume the message from the source if necessary
215 Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
218 messageValue = source.ConsumeMessage(messageHeader, _owningTarget, out consumed);
219 if (!consumed) return DataflowMessageStatus.NotAvailable;
222 // Assign a message ID - strictly sequential, no gaps.
223 // Once consumed, enqueue the message with its ID and kick off asynchronous processing.
224 long messageId = _nextAvailableInputMessageId.Value++;
225 Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
226 if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
227 _messages.Enqueue(new KeyValuePair<TInput, long>(messageValue, messageId));
228 ProcessAsyncIfNecessary();
229 return DataflowMessageStatus.Accepted;
231 // Otherwise, we try to postpone if a source was provided
232 else if (source != null)
234 Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
235 "PostponedMessages must have been initialized during construction in non-greedy mode.");
237 // Store the message's info and kick off asynchronous processing
238 _boundingState.PostponedMessages.Push(source, messageHeader);
239 ProcessAsyncIfNecessary();
240 return DataflowMessageStatus.Postponed;
242 // We can't do anything else about this message
243 return DataflowMessageStatus.Declined;
247 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
248 internal Task Completion { get { return _completionSource.Task; } }
250 /// <summary>Gets the number of items waiting to be processed by this target.</summary>
251 internal int InputCount { get { return _messages.GetCountSafe(IncomingLock); } }
253 /// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
254 internal void SignalOneAsyncMessageCompleted()
256 SignalOneAsyncMessageCompleted(boundingCountChange: 0);
259 /// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
260 /// <param name="boundingCountChange">The number of elements by which to change the bounding count, if bounding is occurring.</param>
261 internal void SignalOneAsyncMessageCompleted(int boundingCountChange)
265 // We're no longer processing, so decrement the DOP counter
266 Debug.Assert(_numberOfOutstandingOperations > 0, "Operations may only be completed if any are outstanding.");
267 if (_numberOfOutstandingOperations > 0) _numberOfOutstandingOperations--;
269 // Fix up the bounding count if necessary
270 if (_boundingState != null && boundingCountChange != 0)
272 Debug.Assert(boundingCountChange <= 0 && _boundingState.CurrentCount + boundingCountChange >= 0,
273 "Expected a negative bounding change and not to drop below zero.");
274 _boundingState.CurrentCount += boundingCountChange;
277 // However, we may have given up early because we hit our own configured
278 // processing limits rather than because we ran out of work to do. If that's
279 // the case, make sure we spin up another task to keep going.
280 ProcessAsyncIfNecessary(repeat: true);
282 // If, however, we stopped because we ran out of work to do and we
283 // know we'll never get more, then complete.
284 CompleteBlockIfPossible();
288 /// <summary>Gets whether this instance has been constructed for async processing.</summary>
289 private bool UsesAsyncCompletion
293 return (_targetCoreOptions & TargetCoreOptions.UsesAsyncCompletion) != 0;
297 /// <summary>Gets whether there's room to launch more processing operations.</summary>
298 private bool HasRoomForMoreOperations
302 Contract.Requires(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
303 Contract.Requires(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
304 Contract.Requires(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
305 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
307 // In async mode, we increment _numberOfOutstandingOperations before we start
308 // our own processing loop which should not count towards the MaxDOP.
309 return (_numberOfOutstandingOperations - _numberOfOutstandingServiceTasks) < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
313 /// <summary>Gets whether there's room to launch more service tasks for doing/launching processing operations.</summary>
314 private bool HasRoomForMoreServiceTasks
318 Contract.Requires(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
319 Contract.Requires(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
320 Contract.Requires(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
321 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
323 if (!UsesAsyncCompletion)
326 // We don't count service tasks, because our tasks are counted as operations.
327 // Therefore, return HasRoomForMoreOperations.
328 return HasRoomForMoreOperations;
333 // We allow up to MaxDOP true service tasks.
334 // Checking whether there is room for more processing operations is not necessary,
335 // but doing so will help us avoid spinning up a task that will go away without
336 // launching any processing operation.
337 return HasRoomForMoreOperations &&
338 _numberOfOutstandingServiceTasks < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
343 /// <summary>Called when new messages are available to be processed.</summary>
344 /// <param name="repeat">Whether this call is the continuation of a previous message loop.</param>
345 private void ProcessAsyncIfNecessary(bool repeat = false)
347 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
349 if (HasRoomForMoreServiceTasks)
351 ProcessAsyncIfNecessary_Slow(repeat);
356 /// Slow path for ProcessAsyncIfNecessary.
357 /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
359 [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
360 private void ProcessAsyncIfNecessary_Slow(bool repeat)
362 Contract.Requires(HasRoomForMoreServiceTasks, "There must be room to process asynchronously.");
363 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
365 // Determine preconditions to launching a processing task
366 bool messagesAvailableOrPostponed =
367 !_messages.IsEmpty ||
368 (!_decliningPermanently && _boundingState != null && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count > 0);
370 // If all conditions are met, launch away
371 if (messagesAvailableOrPostponed && !CanceledOrFaulted)
373 // Any book keeping related to the processing task like incrementing the
374 // DOP counter or eventually recording the tasks reference must be done
375 // before the task starts. That is because the task itself will do the
376 // reverse operation upon its completion.
377 _numberOfOutstandingOperations++;
378 if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks++;
380 var taskForInputProcessing = new Task(thisTargetCore => ((TargetCore<TInput>)thisTargetCore).ProcessMessagesLoopCore(), this,
381 Common.GetCreationOptionsForTask(repeat));
384 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
385 if (etwLog.IsEnabled())
387 etwLog.TaskLaunchedForMessageHandling(
388 _owningTarget, taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
389 _messages.Count + (_boundingState != null ? _boundingState.PostponedMessages.Count : 0));
393 // Start the task handling scheduling exceptions
394 Exception exception = Common.StartTaskSafe(taskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
395 if (exception != null)
397 // Get out from under currently held locks. Complete re-acquires the locks it needs.
398 Task.Factory.StartNew(exc => Complete(exception: (Exception)exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true,
399 unwrapInnerExceptions: false, revertProcessingState: true),
400 exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
405 /// <summary>Task body used to process messages.</summary>
406 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
407 private void ProcessMessagesLoopCore()
409 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
411 KeyValuePair<TInput, long> messageWithId = default(KeyValuePair<TInput, long>);
414 bool useAsyncCompletion = UsesAsyncCompletion;
415 bool shouldAttemptPostponedTransfer = _boundingState != null && _boundingState.BoundedCapacity > 1;
416 int numberOfMessagesProcessedByThisTask = 0;
417 int numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
418 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
420 while (numberOfMessagesProcessedByThisTask < maxMessagesPerTask && !CanceledOrFaulted)
422 // If we're bounding, try to transfer a message from the postponed queue
423 // to the input queue. This enables us to more quickly unblock sources
424 // sending data to the block (otherwise, no postponed messages will be consumed
425 // until the input queue is entirely empty). If the bounded size is 1,
426 // there's no need to transfer, as attempting to get the next message will
427 // just go and consume the postponed message anyway, and we'll save
428 // the extra trip through the _messages queue.
429 KeyValuePair<TInput, long> transferMessageWithId;
430 if (shouldAttemptPostponedTransfer &&
431 TryConsumePostponedMessage(forPostponementTransfer: true, result: out transferMessageWithId))
436 _boundingState.OutstandingTransfers > 0
437 && _boundingState.OutstandingTransfers <= _dataflowBlockOptions.ActualMaxDegreeOfParallelism,
438 "Expected TryConsumePostponedMessage to have incremented the count and for the count to not exceed the DOP.");
439 _boundingState.OutstandingTransfers--; // was incremented in TryConsumePostponedMessage
440 _messages.Enqueue(transferMessageWithId);
441 ProcessAsyncIfNecessary();
445 if (useAsyncCompletion)
447 // Get the next message if DOP is available.
448 // If we can't get a message or DOP is not available, bail out.
449 if (!TryGetNextMessageForNewAsyncOperation(out messageWithId)) break;
453 // Try to get a message for sequential execution, i.e. without checking DOP availability
454 if (!TryGetNextAvailableOrPostponedMessage(out messageWithId))
456 // Try to keep the task alive only if MaxDOP=1
457 if (_dataflowBlockOptions.MaxDegreeOfParallelism != 1) break;
459 // If this task has processed enough messages without being kept alive,
460 // it has served its purpose. Don't keep it alive.
461 if (numberOfMessagesProcessedSinceTheLastKeepAlive > Common.KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD) break;
463 // If keep alive is banned, don't attempt it
464 if (_keepAliveBanCounter > 0)
466 _keepAliveBanCounter--;
470 // Reset the keep alive counter. (Keep this line together with TryKeepAliveUntil.)
471 numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
473 // Try to keep the task alive briefly until a new message arrives
474 if (!Common.TryKeepAliveUntil(_keepAlivePredicate, this, out messageWithId))
476 // Keep alive was unsuccessful.
477 // Therefore ban further attempts temporarily.
478 _keepAliveBanCounter = Common.KEEP_ALIVE_BAN_COUNT;
484 // We have popped a message from the queue.
485 // So increment the counter of processed messages.
486 numberOfMessagesProcessedByThisTask++;
487 numberOfMessagesProcessedSinceTheLastKeepAlive++;
489 // Invoke the user action
490 _callAction(messageWithId);
493 catch (Exception exc)
495 Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key);
496 Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
502 // We incremented _numberOfOutstandingOperations before we launched this task.
503 // So we must decremented it before exiting.
504 // Note that each async task additionally incremented it before starting and
505 // is responsible for decrementing it prior to exiting.
506 Debug.Assert(_numberOfOutstandingOperations > 0, "Expected a positive number of outstanding operations, since we're completing one here.");
507 _numberOfOutstandingOperations--;
509 // If we are in async mode, we've also incremented _numberOfOutstandingServiceTasks.
510 // Now it's time to decrement it.
511 if (UsesAsyncCompletion)
513 Debug.Assert(_numberOfOutstandingServiceTasks > 0, "Expected a positive number of outstanding service tasks, since we're completing one here.");
514 _numberOfOutstandingServiceTasks--;
517 // However, we may have given up early because we hit our own configured
518 // processing limits rather than because we ran out of work to do. If that's
519 // the case, make sure we spin up another task to keep going.
520 ProcessAsyncIfNecessary(repeat: true);
522 // If, however, we stopped because we ran out of work to do and we
523 // know we'll never get more, then complete.
524 CompleteBlockIfPossible();
529 /// <summary>Retrieves the next message from the input queue for the useAsyncCompletion mode.</summary>
530 /// <param name="messageWithId">The next message retrieved.</param>
531 /// <returns>true if a message was found and removed; otherwise, false.</returns>
532 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
533 private bool TryGetNextMessageForNewAsyncOperation(out KeyValuePair<TInput, long> messageWithId)
535 Contract.Requires(UsesAsyncCompletion, "Only valid to use when in async mode.");
536 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
538 bool parallelismAvailable;
542 // If we have room for another asynchronous operation, reserve it.
543 // If later it turns out that we had no work to fill the slot, we'll undo the addition.
544 parallelismAvailable = HasRoomForMoreOperations;
545 if (parallelismAvailable) ++_numberOfOutstandingOperations;
548 messageWithId = default(KeyValuePair<TInput, long>);
549 if (parallelismAvailable)
551 // If a parallelism slot was available, try to get an item.
552 // Be careful, because an exception may be thrown from ConsumeMessage
553 // and we have already incremented _numberOfOutstandingOperations.
554 bool gotMessage = false;
557 gotMessage = TryGetNextAvailableOrPostponedMessage(out messageWithId);
561 // We have incremented the counter, but we didn't get a message.
562 // So we must undo the increment and eventually complete the block.
563 SignalOneAsyncMessageCompleted();
565 // Re-throw the exception. The processing loop will catch it.
569 // There may not be an error, but may have still failed to get a message.
570 // So we must undo the increment and eventually complete the block.
571 if (!gotMessage) SignalOneAsyncMessageCompleted();
576 // If there was no parallelism available, we didn't increment _numberOfOutstandingOperations.
577 // So there is nothing to do except to return false.
582 /// Either takes the next available message from the input queue or retrieves a postponed
583 /// message from a source, based on whether we're in greedy or non-greedy mode.
585 /// <param name="messageWithId">The retrieved item with its Id.</param>
586 /// <returns>true if a message could be removed and returned; otherwise, false.</returns>
587 private bool TryGetNextAvailableOrPostponedMessage(out KeyValuePair<TInput, long> messageWithId)
589 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
591 // First try to get a message from our input buffer.
592 if (_messages.TryDequeue(out messageWithId))
596 // If we can't, but if we have any postponed messages due to bounding, then
597 // try to consume one of these postponed messages.
598 // Since we are not currently holding the lock, it is possible that new messages get queued up
599 // by the time we take the lock to manipulate _boundingState. So we have to double-check the
600 // input queue once we take the lock before we consider postponed messages.
601 else if (_boundingState != null && TryConsumePostponedMessage(forPostponementTransfer: false, result: out messageWithId))
605 // Otherwise, there's no message available.
608 messageWithId = default(KeyValuePair<TInput, long>);
613 /// <summary>Consumes a single postponed message.</summary>
614 /// <param name="forPostponementTransfer">
615 /// true if the method is being called to consume a message that'll then be stored into the input queue;
616 /// false if the method is being called to consume a message that'll be processed immediately.
617 /// If true, the bounding state's ForcePostponement will be updated.
618 /// If false, the method will first try (while holding the lock) to consume from the input queue before
619 /// consuming a postponed message.
621 /// <param name="result">The consumed message.</param>
622 /// <returns>true if a message was consumed; otherwise, false.</returns>
623 private bool TryConsumePostponedMessage(
624 bool forPostponementTransfer,
625 out KeyValuePair<TInput, long> result)
628 _dataflowBlockOptions.BoundedCapacity !=
629 System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded, "Only valid to use when in bounded mode.");
630 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
632 // Iterate until we either consume a message successfully or there are no more postponed messages.
633 bool countIncrementedExpectingToGetItem = false;
634 long messageId = Common.INVALID_REORDERING_ID;
637 KeyValuePair<ISourceBlock<TInput>, DataflowMessageHeader> element;
640 // If we are declining permanently, don't consume postponed messages.
641 if (_decliningPermanently) break;
643 // New messages may have been queued up while we weren't holding the lock.
644 // In particular, the input queue may have been filled up and messages may have
645 // gotten postponed. If we process such a postponed message, we would mess up the
646 // order. Therefore, we have to double-check the input queue first.
647 if (!forPostponementTransfer && _messages.TryDequeue(out result)) return true;
649 // We can consume a message to process if there's one to process and also if
650 // if we have logical room within our bound for the message.
651 if (!_boundingState.CountIsLessThanBound || !_boundingState.PostponedMessages.TryPop(out element))
653 if (countIncrementedExpectingToGetItem)
655 countIncrementedExpectingToGetItem = false;
656 _boundingState.CurrentCount -= 1;
660 if (!countIncrementedExpectingToGetItem)
662 countIncrementedExpectingToGetItem = true;
663 messageId = _nextAvailableInputMessageId.Value++; // optimistically assign an ID
664 Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
665 _boundingState.CurrentCount += 1; // optimistically take bounding space
666 if (forPostponementTransfer)
668 Debug.Assert(_boundingState.OutstandingTransfers >= 0, "Expected TryConsumePostponedMessage to not be negative.");
669 _boundingState.OutstandingTransfers++; // temporarily force postponement until we've successfully consumed the element
672 } // Must not call to source while holding lock
675 TInput consumedValue = element.Key.ConsumeMessage(element.Value, _owningTarget, out consumed);
678 result = new KeyValuePair<TInput, long>(consumedValue, messageId);
683 if (forPostponementTransfer)
685 // We didn't consume message so we need to decrement because we havent consumed the element.
686 _boundingState.OutstandingTransfers--;
691 // We optimistically acquired a message ID for a message that, in the end, we never got.
692 // So, we need to let the reordering buffer (if one exists) know that it should not
693 // expect an item with this ID. Otherwise, it would stall forever.
694 if (_reorderingBuffer != null && messageId != Common.INVALID_REORDERING_ID) _reorderingBuffer.IgnoreItem(messageId);
696 // Similarly, we optimistically increased the bounding count, expecting to get another message in.
697 // Since we didn't, we need to fix the bounding count back to what it should have been.
698 if (countIncrementedExpectingToGetItem) ChangeBoundingCount(-1);
700 // Inform the caller that no message could be consumed.
701 result = default(KeyValuePair<TInput, long>);
705 /// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary>
706 private bool CanceledOrFaulted
710 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || Volatile.Read(ref _exceptions) != null;
714 /// <summary>Completes the block once all completion conditions are met.</summary>
715 private void CompleteBlockIfPossible()
717 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
719 bool noMoreMessages = _decliningPermanently && _messages.IsEmpty;
720 if (noMoreMessages || CanceledOrFaulted)
722 CompleteBlockIfPossible_Slow();
727 /// Slow path for CompleteBlockIfPossible.
728 /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
730 private void CompleteBlockIfPossible_Slow()
732 Contract.Requires((_decliningPermanently && _messages.IsEmpty) || CanceledOrFaulted, "There must be no more messages.");
733 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
735 bool notCurrentlyProcessing = _numberOfOutstandingOperations == 0;
736 if (notCurrentlyProcessing && !_completionReserved)
738 // Make sure no one else tries to call CompleteBlockOncePossible
739 _completionReserved = true;
741 // Make sure the target is declining
742 _decliningPermanently = true;
744 // Get out from under currently held locks. This is to avoid
745 // invoking synchronous continuations off of _completionSource.Task
746 // while holding a lock.
747 Task.Factory.StartNew(state => ((TargetCore<TInput>)state).CompleteBlockOncePossible(),
748 this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
753 /// Completes the block. This must only be called once, and only once all of the completion conditions are met.
754 /// As such, it must only be called from CompleteBlockIfPossible.
756 private void CompleteBlockOncePossible()
758 // Since the lock is needed only for the Assert, we do this only in DEBUG mode
760 lock (IncomingLock) Debug.Assert(_numberOfOutstandingOperations == 0, "Everything must be done by now.");
763 // Release any postponed messages
764 if (_boundingState != null)
766 // Note: No locks should be held at this point.
767 Common.ReleaseAllPostponedMessages(_owningTarget, _boundingState.PostponedMessages, ref _exceptions);
770 // For good measure and help in preventing leaks, clear out the incoming message queue,
771 // which may still contain orphaned data if we were canceled or faulted. However,
772 // we don't reset the bounding count here, as the block as a whole may still be active.
773 KeyValuePair<TInput, long> ignored;
774 IProducerConsumerQueue<KeyValuePair<TInput, long>> messages = _messages;
775 while (messages.TryDequeue(out ignored)) ;
777 // If we completed with any unhandled exception, finish in an error state
778 if (Volatile.Read(ref _exceptions) != null)
780 // It's ok to read _exceptions' content here, because
781 // at this point no more exceptions can be generated and thus no one will
783 _completionSource.TrySetException(Volatile.Read(ref _exceptions));
785 // If we completed with cancellation, finish in a canceled state
786 else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
788 _completionSource.TrySetCanceled();
790 // Otherwise, finish in a successful state.
793 _completionSource.TrySetResult(default(VoidResult));
796 // We only want to do tracing for block completion if this target core represents the whole block.
797 // If it only represents a part of the block (i.e. there's a source associated with it as well),
798 // then we shouldn't log just for the first half of the block; the source half will handle logging.
799 DataflowEtwProvider etwLog;
800 if ((_targetCoreOptions & TargetCoreOptions.RepresentsBlockCompletion) != 0 &&
801 (etwLog = DataflowEtwProvider.Log).IsEnabled())
803 etwLog.DataflowBlockCompleted(_owningTarget);
808 /// <summary>Gets whether the target core is operating in a bounded mode.</summary>
809 internal bool IsBounded { get { return _boundingState != null; } }
811 /// <summary>Increases or decreases the bounding count.</summary>
812 /// <param name="count">The incremental addition (positive to increase, negative to decrease).</param>
813 internal void ChangeBoundingCount(int count)
815 Contract.Requires(count != 0, "Should only be called when the count is actually changing.");
816 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
817 if (_boundingState != null)
821 Debug.Assert(count > 0 || (count < 0 && _boundingState.CurrentCount + count >= 0),
822 "If count is negative, it must not take the total count negative.");
823 _boundingState.CurrentCount += count;
824 ProcessAsyncIfNecessary();
825 CompleteBlockIfPossible();
830 /// <summary>Gets the object to display in the debugger display attribute.</summary>
831 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
832 private object DebuggerDisplayContent
836 var displayTarget = _owningTarget as IDebuggerDisplay;
837 return string.Format("Block=\"{0}\"",
838 displayTarget != null ? displayTarget.Content : _owningTarget);
842 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
843 internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
845 /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
846 /// <returns>Debugging information about this target.</returns>
847 internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
849 /// <summary>Provides a wrapper for commonly needed debugging information.</summary>
850 internal sealed class DebuggingInformation
852 /// <summary>The target being viewed.</summary>
853 private readonly TargetCore<TInput> _target;
855 /// <summary>Initializes the debugging helper.</summary>
856 /// <param name="target">The target being viewed.</param>
857 internal DebuggingInformation(TargetCore<TInput> target) { _target = target; }
859 /// <summary>Gets the number of messages waiting to be processed.</summary>
860 internal int InputCount { get { return _target._messages.Count; } }
861 /// <summary>Gets the messages waiting to be processed.</summary>
862 internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(kvp => kvp.Key).ToList(); } }
864 /// <summary>Gets any postponed messages.</summary>
865 internal QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
867 get { return _target._boundingState != null ? _target._boundingState.PostponedMessages : null; }
870 /// <summary>Gets the current number of outstanding input processing operations.</summary>
871 internal Int32 CurrentDegreeOfParallelism { get { return _target._numberOfOutstandingOperations - _target._numberOfOutstandingServiceTasks; } }
873 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
874 internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
875 /// <summary>Gets whether the block is declining further messages.</summary>
876 internal bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
877 /// <summary>Gets whether the block is completed.</summary>
878 internal bool IsCompleted { get { return _target.Completion.IsCompleted; } }