1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // The core implementation of a standard ISourceBlock<TOutput>.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
18 using System.Security;
20 namespace System.Threading.Tasks.Dataflow.Internal
22 // LOCK-LEVELING SCHEME
23 // --------------------
24 // SourceCore employs two locks: OutgoingLock and ValueLock. Additionally, targets we call out to
25 // likely utilize their own IncomingLock. We can hold OutgoingLock while acquiring ValueLock or IncomingLock.
26 // However, we cannot hold ValueLock while calling out to external code or while acquiring OutgoingLock, and
27 // we cannot hold IncomingLock when acquiring OutgoingLock. Additionally, the locks employed must be reentrant.
29 /// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
30 /// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
31 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
32 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
33 internal sealed class SourceCore<TOutput>
35 // *** These fields are readonly and are initialized to new instances at construction.
37 /// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
38 private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
39 /// <summary>A registry used to store all linked targets and information about them.</summary>
40 private readonly TargetRegistry<TOutput> _targetRegistry;
41 /// <summary>The output messages queued up to be received by consumers/targets.</summary>
43 /// The queue is only ever accessed by a single producer and single consumer at a time. On the producer side,
44 /// we require that AddMessage/AddMessages are the only places the queue is added to, and we require that those
45 /// methods not be used concurrently with anything else. All of our target halves today follow that restriction;
46 /// for example, TransformBlock with DOP==1 will have at most a single task processing the user provided delegate,
47 /// and thus at most one task calling AddMessage. If it has a DOP > 1, it'll go through the ReorderingBuffer,
48 /// which will use a lock to synchronize the output of all of the processing tasks such that only one is using
49 /// AddMessage at a time. On the consumer side of SourceCore, all consumption is protected by ValueLock, and thus
50 /// all consumption is serialized.
52 private readonly SingleProducerSingleConsumerQueue<TOutput> _messages = new SingleProducerSingleConsumerQueue<TOutput>(); // protected by AddMessage/ValueLock
54 /// <summary>Gets the object to use as the outgoing lock.</summary>
55 private object OutgoingLock { get { return _completionTask; } }
56 /// <summary>Gets the object to use as the value lock.</summary>
57 private object ValueLock { get { return _targetRegistry; } }
59 // *** These fields are readonly and are initialized by arguments to the constructor.
61 /// <summary>The source utilizing this helper.</summary>
62 private readonly ISourceBlock<TOutput> _owningSource;
63 /// <summary>The options used to configure this block's execution.</summary>
64 private readonly DataflowBlockOptions _dataflowBlockOptions;
66 /// An action to be invoked on the owner block to stop accepting messages.
67 /// This action is invoked when SourceCore encounters an exception.
69 private readonly Action<ISourceBlock<TOutput>> _completeAction;
71 /// An action to be invoked on the owner block when an item is removed.
72 /// This may be null if the owner block doesn't need to be notified.
74 private readonly Action<ISourceBlock<TOutput>, int> _itemsRemovedAction;
75 /// <summary>Item counting function</summary>
76 private readonly Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, int> _itemCountingFunc;
78 // *** These fields are mutated during execution.
80 /// <summary>The task used to process the output and offer it to targets.</summary>
81 private Task _taskForOutputProcessing; // protected by ValueLock
82 /// <summary>Counter for message IDs unique within this source block.</summary>
83 private PaddedInt64 _nextMessageId = new PaddedInt64 { Value = 1 }; // We are going to use this value before incrementing. Protected by ValueLock.
84 /// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
85 private ITargetBlock<TOutput> _nextMessageReservedFor; // protected by OutgoingLock
86 /// <summary>Whether all future messages should be declined.</summary>
87 private bool _decliningPermanently; // Protected by ValueLock
88 /// <summary>Whether this block should again attempt to offer messages to targets.</summary>
89 private bool _enableOffering = true; // Protected by ValueLock, sometimes read with volatile reads
90 /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
91 private bool _completionReserved; // Protected by OutgoingLock
92 /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
93 private List<Exception> _exceptions; // Protected by ValueLock, sometimes read with volatile reads
95 /// <summary>Initializes the source core.</summary>
96 /// <param name="owningSource">The source utilizing this core.</param>
97 /// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
98 /// <param name="completeAction">Action to invoke in order to decline the associated target half, which will in turn decline this source core.</param>
99 /// <param name="itemsRemovedAction">Action to invoke when one or more items is removed. This may be null.</param>
100 /// <param name="itemCountingFunc">
101 /// Action to invoke when the owner needs to be able to count the number of individual
102 /// items in an output or set of outputs.
105 ISourceBlock<TOutput> owningSource, DataflowBlockOptions dataflowBlockOptions,
106 Action<ISourceBlock<TOutput>> completeAction,
107 Action<ISourceBlock<TOutput>, int> itemsRemovedAction = null,
108 Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, int> itemCountingFunc = null)
110 Contract.Requires(owningSource != null, "Core must be associated with a source.");
111 Contract.Requires(dataflowBlockOptions != null, "Options must be provided to configure the core.");
112 Contract.Requires(completeAction != null, "Action to invoke on completion is required.");
115 _owningSource = owningSource;
116 _dataflowBlockOptions = dataflowBlockOptions;
117 _itemsRemovedAction = itemsRemovedAction;
118 _itemCountingFunc = itemCountingFunc;
119 _completeAction = completeAction;
121 // Construct members that depend on the args
122 _targetRegistry = new TargetRegistry<TOutput>(_owningSource);
125 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
126 [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
127 internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
129 // Validate arguments
130 if (target == null) throw new ArgumentNullException("target");
131 if (linkOptions == null) throw new ArgumentNullException("linkOptions");
132 Contract.EndContractBlock();
134 // If the block is already completed, there is not much to do -
135 // we have to propagate completion if that was requested, and
136 // then bail without taking the lock.
137 if (_completionTask.Task.IsCompleted)
139 if (linkOptions.PropagateCompletion) Common.PropagateCompletion(_completionTask.Task, target, exceptionHandler: null);
140 return Disposables.Nop;
145 // If completion has been reserved, the target registry has either been cleared already
146 // or is about to be cleared. So we can link and offer only if completion is not reserved.
147 if (!_completionReserved)
149 _targetRegistry.Add(ref target, linkOptions);
150 OfferToTargets(linkToTarget: target);
151 return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
155 // The block should not offer any messages when it is in this state, but
156 // it should still propagate completion if that has been requested.
157 if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
158 return Disposables.Nop;
161 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
162 internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
164 // Validate arguments
165 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
166 if (target == null) throw new ArgumentNullException("target");
167 Contract.EndContractBlock();
169 TOutput consumedMessageValue = default(TOutput);
173 // If this target doesn't hold the reservation, then for this ConsumeMessage
174 // to be valid, there must not be any reservation (since otherwise we can't
175 // consume a message destined for someone else).
176 if (_nextMessageReservedFor != target &&
177 _nextMessageReservedFor != null)
179 messageConsumed = false;
180 return default(TOutput);
185 // If the requested message isn't the next message to be served up, bail.
186 // Otherwise, we're good to go: dequeue the message as it will now be owned by the target,
187 // signal that we can resume enabling offering as there's potentially a new "next message",
188 // complete if necessary, and offer asynchronously all messages as is appropriate.
190 if (messageHeader.Id != _nextMessageId.Value ||
191 !_messages.TryDequeue(out consumedMessageValue))
193 messageConsumed = false;
194 return default(TOutput);
197 _nextMessageReservedFor = null;
198 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
199 _enableOffering = true; // reenable offering if it was disabled
200 _nextMessageId.Value++;
201 CompleteBlockIfPossible();
202 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
206 // Notify the owner block that our count has decreased
207 if (_itemsRemovedAction != null)
209 int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, consumedMessageValue, null) : 1;
210 _itemsRemovedAction(_owningSource, count);
213 // Return the consumed message value
214 messageConsumed = true;
215 return consumedMessageValue;
218 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
219 internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
221 // Validate arguments
222 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
223 if (target == null) throw new ArgumentNullException("target");
224 Contract.EndContractBlock();
228 // If no one currently holds a reservation...
229 if (_nextMessageReservedFor == null)
233 // ...and if the requested message is next in the queue, allow it
234 if (messageHeader.Id == _nextMessageId.Value && !_messages.IsEmpty)
236 _nextMessageReservedFor = target;
237 _enableOffering = false;
246 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
247 internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
249 // Validate arguments
250 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
251 if (target == null) throw new ArgumentNullException("target");
252 Contract.EndContractBlock();
256 // If someone else holds the reservation, bail.
257 if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
261 // If this is not the message at the head of the queue, bail
262 if (messageHeader.Id != _nextMessageId.Value || _messages.IsEmpty) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
264 // Otherwise, release the reservation
265 _nextMessageReservedFor = null;
266 Debug.Assert(!_enableOffering, "Offering should have been disabled if there was a valid reservation");
267 _enableOffering = true;
269 // Now there is at least one message ready for offering. So offer it.
270 // If a cancellation is pending, this method will bail out.
271 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
273 // This reservation may be holding the block's completion. So try to complete.
274 CompleteBlockIfPossible();
279 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
280 internal Task Completion { get { return _completionTask.Task; } }
282 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
283 internal Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
285 item = default(TOutput);
286 bool itemReceived = false;
290 // If the next message is reserved for someone, we can't receive right now. Otherwise...
291 if (_nextMessageReservedFor == null)
295 // If there's at least one message, and there's no filter or the next item
296 // passes the filter, dequeue it to be returned.
297 if (_messages.TryDequeueIf(filter, out item))
299 _nextMessageId.Value++;
301 // Now that the next message has changed, reenable offering if it was disabled
302 _enableOffering = true;
304 // If removing this item was the last thing this block will ever do, complete it,
305 CompleteBlockIfPossible();
307 // Now, try to offer up messages asynchronously, since we've
308 // changed what's at the head of the queue
309 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
319 // Notify the owner block that our count has decreased
320 if (_itemsRemovedAction != null)
322 int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, item, null) : 1;
323 _itemsRemovedAction(_owningSource, count);
329 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
330 internal bool TryReceiveAll(out IList<TOutput> items)
333 int countReceived = 0;
337 // If the next message is reserved for someone, we can't receive right now. Otherwise...
338 if (_nextMessageReservedFor == null)
342 if (!_messages.IsEmpty)
344 // Receive all of the data, clearing it out in the process.
345 var tmpList = new List<TOutput>();
347 while (_messages.TryDequeue(out item)) tmpList.Add(item);
348 countReceived = tmpList.Count;
351 // Increment the next ID. Any new value is good.
352 _nextMessageId.Value++;
354 // Now that the next message has changed, reenable offering if it was disabled
355 _enableOffering = true;
357 // Now that the block is empty, check to see whether we should complete.
358 CompleteBlockIfPossible();
364 if (countReceived > 0)
366 // Notify the owner block that our count has decreased
367 if (_itemsRemovedAction != null)
369 int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, default(TOutput), items) : countReceived;
370 _itemsRemovedAction(_owningSource, count);
377 /// <summary>Gets the number of items available to be received from this block.</summary>
378 internal int OutputCount { get { lock (OutgoingLock) lock (ValueLock) return _messages.Count; } }
381 /// Adds a message to the source block for propagation.
382 /// This method must only be used by one thread at a time, and must not be used concurrently
383 /// with any other producer side methods, e.g. AddMessages, Complete.
385 /// <param name="item">The item to be wrapped in a message to be added.</param>
386 internal void AddMessage(TOutput item)
388 // This method must not take the OutgoingLock, as it will likely be called in situations
389 // where an IncomingLock is held.
391 if (_decliningPermanently) return;
392 _messages.Enqueue(item);
394 Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
396 if (_taskForOutputProcessing == null)
398 // Separated out to enable inlining of AddMessage
399 OfferAsyncIfNecessaryWithValueLock();
404 /// Adds messages to the source block for propagation.
405 /// This method must only be used by one thread at a time, and must not be used concurrently
406 /// with any other producer side methods, e.g. AddMessage, Complete.
408 /// <param name="items">The list of items to be wrapped in messages to be added.</param>
409 internal void AddMessages(IEnumerable<TOutput> items)
411 Contract.Requires(items != null, "Items list must be valid.");
413 // This method must not take the OutgoingLock, as it will likely be called in situations
414 // where an IncomingLock is held.
416 if (_decliningPermanently) return;
418 // Special case arrays and lists, for which we can avoid the
419 // enumerator allocation that'll result from using a foreach.
420 // This also avoids virtual method calls that we'd get if we
421 // didn't special case.
422 var itemsAsList = items as List<TOutput>;
423 if (itemsAsList != null)
425 for (int i = 0; i < itemsAsList.Count; i++)
427 _messages.Enqueue(itemsAsList[i]);
432 TOutput[] itemsAsArray = items as TOutput[];
433 if (itemsAsArray != null)
435 for (int i = 0; i < itemsAsArray.Length; i++)
437 _messages.Enqueue(itemsAsArray[i]);
442 foreach (TOutput item in items)
444 _messages.Enqueue(item);
449 Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
451 if (_taskForOutputProcessing == null)
453 OfferAsyncIfNecessaryWithValueLock();
457 /// <summary>Adds an individual exceptionto this source.</summary>
458 /// <param name="exception">The exception to add</param>
459 internal void AddException(Exception exception)
461 Contract.Requires(exception != null, "Valid exception must be provided to be added.");
462 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
465 Common.AddException(ref _exceptions, exception);
469 /// <summary>Adds exceptions to this source.</summary>
470 /// <param name="exceptions">The exceptions to add</param>
471 internal void AddExceptions(List<Exception> exceptions)
473 Contract.Requires(exceptions != null, "Valid exceptions must be provided to be added.");
474 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
477 foreach (Exception exception in exceptions)
479 Common.AddException(ref _exceptions, exception);
484 /// <summary>Adds the exceptions contained in an AggregateException to this source.</summary>
485 /// <param name="aggregateException">The exception to add</param>
486 internal void AddAndUnwrapAggregateException(AggregateException aggregateException)
488 Contract.Requires(aggregateException != null && aggregateException.InnerExceptions.Count > 0, "Aggregate must be valid and contain inner exceptions to unwrap.");
489 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
492 Common.AddException(ref _exceptions, aggregateException, unwrapInnerExceptions: true);
496 /// <summary>Gets whether the _exceptions list is non-null.</summary>
497 internal bool HasExceptions
501 // We may check whether _exceptions is null without taking a lock because it is volatile
502 return Volatile.Read(ref _exceptions) != null;
506 /// <summary>Informs the block that it will not be receiving additional messages.</summary>
507 internal void Complete()
511 _decliningPermanently = true;
513 // CompleteAdding may be called in a context where an incoming lock is held. We need to
514 // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
515 // However, we know that _decliningPermanently has been set, and thus the timing of
516 // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
517 // and take the necessary locks in a situation where we're sure it won't cause a problem.
518 Task.Factory.StartNew(state =>
520 var thisSourceCore = (SourceCore<TOutput>)state;
521 lock (thisSourceCore.OutgoingLock)
523 lock (thisSourceCore.ValueLock)
525 thisSourceCore.CompleteBlockIfPossible();
528 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
532 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
533 internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
535 /// <summary>Offers messages to all targets.</summary>
536 /// <param name="linkToTarget">
537 /// The newly linked target, if OfferToTargets is being called to synchronously
538 /// propagate to a target during a LinkTo operation.
540 private bool OfferToTargets(ITargetBlock<TOutput> linkToTarget = null)
542 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
543 Common.ContractAssertMonitorStatus(ValueLock, held: false);
545 // If the next message is reserved, we can't offer anything
546 if (_nextMessageReservedFor != null)
549 // Peek at the next message if there is one, so we can offer it.
550 DataflowMessageHeader header = default(DataflowMessageHeader);
551 TOutput message = default(TOutput);
552 bool offerJustToLinkToTarget = false;
554 // If offering isn't enabled and if we're not doing this as
555 // a result of LinkTo, bail. Otherwise, with offering disabled, we must have
556 // already offered this message to all existing targets, so we can just offer
557 // it to the newly linked target.
558 if (!Volatile.Read(ref _enableOffering))
560 if (linkToTarget == null) return false;
561 else offerJustToLinkToTarget = true;
564 // Otherwise, peek at message to offer
565 if (_messages.TryPeek(out message))
567 header = new DataflowMessageHeader(_nextMessageId.Value);
570 // If there is a message, offer it.
571 bool messageWasAccepted = false;
574 if (offerJustToLinkToTarget)
576 // If we've already offered the message to everyone else,
577 // we can just offer it to the newly linked target
578 Debug.Assert(linkToTarget != null, "Must have a valid target to offer to.");
579 OfferMessageToTarget(header, message, linkToTarget, out messageWasAccepted);
583 // Otherwise, we've not yet offered this message to anyone, so even
584 // if linkToTarget is non-null, we need to propagate the message in order
585 // through all of the registered targets, the last of which will be the linkToTarget
586 // if it's non-null (no need to special-case it, though).
588 // Note that during OfferMessageToTarget, a target may call ConsumeMessage (taking advantage of the
589 // reentrancy of OutgoingLock), which may unlink the target if the target is registered as "unlinkAfterOne".
590 // Doing so will remove the target from the targets list. As such, we maintain the next node
591 // separately from cur.Next, in case cur.Next changes by cur being removed from the list.
592 // No other node in the list should change, as we're protected by OutgoingLock.
594 TargetRegistry<TOutput>.LinkedTargetInfo cur = _targetRegistry.FirstTargetNode;
597 TargetRegistry<TOutput>.LinkedTargetInfo next = cur.Next;
598 if (OfferMessageToTarget(header, message, cur.Target, out messageWasAccepted)) break;
602 // If none of the targets accepted the message, disable offering.
603 if (!messageWasAccepted)
607 _enableOffering = false;
613 // If a message got accepted, consume it and reenable offering.
614 if (messageWasAccepted)
618 // SourceCore set consumeToAccept to false. However, it's possible
619 // that an incorrectly written target may ignore that parameter and synchronously consume
620 // even though they weren't supposed to. To recover from that,
621 // we'll only dequeue if the correct message is still at the head of the queue.
622 // However, we'll assert so that we can at least catch this in our own debug builds.
624 if (_nextMessageId.Value != header.Id ||
625 !_messages.TryDequeue(out dropped)) // remove the next message
627 Debug.Assert(false, "The target did not follow the protocol.");
629 _nextMessageId.Value++;
631 // The message was accepted, so there's now going to be a new next message.
632 // If offering had been disabled, reenable it.
633 _enableOffering = true;
635 // Now that a message has been removed, we need to complete if possible or
636 // or asynchronously offer if necessary. However, if we're calling this as part of our
637 // offering loop, we won't be able to do either, since by definition there's already
638 // a processing task spun up (us) that would prevent these things. So we only
639 // do the checks if we're being called to link a new target rather than as part
640 // of normal processing.
641 if (linkToTarget != null)
643 CompleteBlockIfPossible();
644 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
648 // Notify the owner block that our count has decreased
649 if (_itemsRemovedAction != null)
651 int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, message, null) : 1;
652 _itemsRemovedAction(_owningSource, count);
656 return messageWasAccepted;
659 /// <summary>Offers the message to the target.</summary>
660 /// <param name="header">The header of the message to offer.</param>
661 /// <param name="message">The message being offered.</param>
662 /// <param name="target">The single target to which the message should be offered.</param>
663 /// <param name="messageWasAccepted">true if the message was accepted by the target; otherwise, false.</param>
665 /// true if the message should not be offered to additional targets;
666 /// false if propagation should be allowed to continue.
668 private bool OfferMessageToTarget(
669 DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target,
670 out bool messageWasAccepted)
672 Contract.Requires(target != null, "Valid target to offer to is required.");
673 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
674 Common.ContractAssertMonitorStatus(ValueLock, held: false);
676 DataflowMessageStatus result = target.OfferMessage(header, message, _owningSource, consumeToAccept: false);
677 Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages are not being offered concurrently, so nothing should be missed.");
678 messageWasAccepted = false;
680 // If accepted, note it, and if the target was linked as "once", remove it
681 if (result == DataflowMessageStatus.Accepted)
683 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
684 messageWasAccepted = true;
685 return true; // the message should not be offered to anyone else
687 // If declined permanently, remove the target
688 else if (result == DataflowMessageStatus.DecliningPermanently)
690 _targetRegistry.Remove(target);
692 // If the message was reserved by the target, stop propagating
693 else if (_nextMessageReservedFor != null)
695 Debug.Assert(result == DataflowMessageStatus.Postponed,
696 "If the message was reserved, it should also have been postponed.");
697 return true; // the message should not be offered to anyone else
699 // If the result was Declined, there's nothing more to be done.
700 // This message will sit at the front of the queue until someone claims it.
702 return false; // allow the message to be offered to someone else
706 /// Called when we want to enable asynchronously offering message to targets.
707 /// Takes the ValueLock before delegating to OfferAsyncIfNecessary.
709 private void OfferAsyncIfNecessaryWithValueLock()
713 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: false);
717 /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
718 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
719 /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
720 private void OfferAsyncIfNecessary(bool isReplacementReplica, bool outgoingLockKnownAcquired)
722 Common.ContractAssertMonitorStatus(ValueLock, held: true);
724 // Fast path to enable OfferAsyncIfNecessary to be inlined. We only need
725 // to proceed if there's no task processing, offering is enabled, and
726 // there are no messages to be processed.
727 if (_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty)
729 // Slow path: do additional checks and potentially launch new task
730 OfferAsyncIfNecessary_Slow(isReplacementReplica, outgoingLockKnownAcquired);
734 /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
735 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
736 /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
737 private void OfferAsyncIfNecessary_Slow(bool isReplacementReplica, bool outgoingLockKnownAcquired)
739 Common.ContractAssertMonitorStatus(ValueLock, held: true);
740 Debug.Assert(_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty,
741 "The block must be enabled for offering, not currently be processing, and have messages available to process.");
743 // This method must not take the outgoing lock, as it will likely be called in situations
744 // where a derived type's incoming lock is held.
746 bool targetsAvailable = true;
747 if (outgoingLockKnownAcquired || Monitor.IsEntered(OutgoingLock))
749 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
750 targetsAvailable = _targetRegistry.FirstTargetNode != null;
753 // If there's any work to be done...
754 if (targetsAvailable && !CanceledOrFaulted)
756 // Create task and store into _taskForOutputProcessing prior to scheduling the task
757 // so that _taskForOutputProcessing will be visibly set in the task loop.
758 _taskForOutputProcessing = new Task(thisSourceCore => ((SourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore(), this,
759 Common.GetCreationOptionsForTask(isReplacementReplica));
762 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
763 if (etwLog.IsEnabled())
765 etwLog.TaskLaunchedForMessageHandling(
766 _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
770 // Start the task handling scheduling exceptions
771 #pragma warning disable 0420
772 Exception exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
773 #pragma warning restore 0420
774 if (exception != null)
776 // First, log the exception while the processing state is dirty which is preventing the block from completing.
777 // Then revert the proactive processing state changes.
778 // And last, try to complete the block.
779 AddException(exception);
780 _taskForOutputProcessing = null;
781 _decliningPermanently = true;
783 // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
784 // Re-take the locks on a separate thread.
785 Task.Factory.StartNew(state =>
787 var thisSourceCore = (SourceCore<TOutput>)state;
788 lock (thisSourceCore.OutgoingLock)
790 lock (thisSourceCore.ValueLock)
792 thisSourceCore.CompleteBlockIfPossible();
795 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
797 if (exception != null) AddException(exception);
801 /// <summary>Task body used to process messages.</summary>
802 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
803 private void OfferMessagesLoopCore()
805 Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
806 "Must be part of the current processing task.");
809 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
811 // We need to hold the outgoing lock while offering messages. We can either
812 // lock and unlock for each individual offering, or we can lock around multiple or all
813 // possible offerings. The former ensures that other operations don't get starved,
814 // while the latter is much more efficient (not continually acquiring and releasing
815 // the lock). For blocks that aren't linked to any targets, this won't matter
816 // (no offering is done), and for blocks that are only linked to targets, this shouldn't
817 // matter (no one is contending for the lock), thus
818 // the only case it would matter is when a block both has targets and is being
819 // explicitly received from, which is an uncommon scenario. Thus, we want to lock
820 // around the whole thing to improve performance, but just in case we do hit
821 // an uncommon scenario, in the default case we release the lock every now and again.
822 // If a developer wants to control this, they can limit the duration of the
823 // lock by using MaxMessagesPerTask.
825 const int DEFAULT_RELEASE_LOCK_ITERATIONS = 10; // Dialable
826 int releaseLockIterations =
827 _dataflowBlockOptions.MaxMessagesPerTask == DataflowBlockOptions.Unbounded ?
828 DEFAULT_RELEASE_LOCK_ITERATIONS : maxMessagesPerTask;
830 for (int messageCounter = 0;
831 messageCounter < maxMessagesPerTask && !CanceledOrFaulted;)
835 // While there are more messages to process, offer each in turn
836 // to the targets. If we're unable to propagate a particular message,
837 // stop trying until something changes in the future.
839 int lockReleaseCounter = 0;
840 messageCounter < maxMessagesPerTask && lockReleaseCounter < releaseLockIterations && !CanceledOrFaulted;
841 ++messageCounter, ++lockReleaseCounter)
843 if (!OfferToTargets()) return;
848 catch (Exception exc)
850 // Record the exception
853 // Notify the owning block it should stop accepting new messages
854 _completeAction(_owningSource);
862 // We're no longer processing, so null out the processing task
863 Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
864 "Must be part of the current processing task.");
865 _taskForOutputProcessing = null;
866 Interlocked.MemoryBarrier(); // synchronize with AddMessage(s) and its read of _taskForOutputProcessing
868 // However, we may have given up early because we hit our own configured
869 // processing limits rather than because we ran out of work to do. If that's
870 // the case, make sure we spin up another task to keep going.
871 OfferAsyncIfNecessary(isReplacementReplica: true, outgoingLockKnownAcquired: true);
873 // If, however, we stopped because we ran out of work to do and we
874 // know we'll never get more, then complete.
875 CompleteBlockIfPossible();
881 /// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
882 private bool CanceledOrFaulted
886 // Cancellation is honored as soon as the CancellationToken has been signaled.
887 // Faulting is honored after an exception has been encountered and the owning block
888 // has invoked Complete on us.
889 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
890 (HasExceptions && _decliningPermanently);
894 /// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
895 private void CompleteBlockIfPossible()
897 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
898 Common.ContractAssertMonitorStatus(ValueLock, held: true);
900 if (!_completionReserved)
902 if (_decliningPermanently && // declining permanently, so no more messages will arrive
903 _taskForOutputProcessing == null && // no current processing
904 _nextMessageReservedFor == null) // no pending reservation
906 CompleteBlockIfPossible_Slow();
912 /// Slow path for CompleteBlockIfPossible.
913 /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
915 private void CompleteBlockIfPossible_Slow()
918 _decliningPermanently && _taskForOutputProcessing == null && _nextMessageReservedFor == null,
919 "The block must be declining permanently, there must be no reservations, and there must be no processing tasks");
920 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
921 Common.ContractAssertMonitorStatus(ValueLock, held: true);
923 if (_messages.IsEmpty || CanceledOrFaulted)
925 _completionReserved = true;
927 // Get out from under currently held locks. This is to avoid
928 // invoking synchronous continuations off of _completionTask.Task
929 // while holding a lock.
930 Task.Factory.StartNew(state => ((SourceCore<TOutput>)state).CompleteBlockOncePossible(),
931 this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
936 /// Completes the block. This must only be called once, and only once all of the completion conditions are met.
937 /// As such, it must only be called from CompleteBlockIfPossible.
939 private void CompleteBlockOncePossible()
941 TargetRegistry<TOutput>.LinkedTargetInfo linkedTargets;
942 List<Exception> exceptions;
944 // Avoid completing while the code that caused this completion to occur is still holding a lock.
945 // Clear out the target registry and buffers to help avoid memory leaks.
948 // Save the linked list of targets so that it could be traversed later to propagate completion
949 linkedTargets = _targetRegistry.ClearEntryPoints();
954 // Save a local reference to the exceptions list and null out the field,
955 // so that if the target side tries to add an exception this late,
956 // it will go to a separate list (that will be ignored.)
957 exceptions = _exceptions;
962 // If it's due to an unhandled exception, finish in an error state
963 if (exceptions != null)
965 _completionTask.TrySetException(exceptions);
967 // If it's due to cancellation, finish in a canceled state
968 else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
970 _completionTask.TrySetCanceled();
972 // Otherwise, finish in a successful state.
975 _completionTask.TrySetResult(default(VoidResult));
978 // Now that the completion task is completed, we may propagate completion to the linked targets
979 _targetRegistry.PropagateCompletion(linkedTargets);
981 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
982 if (etwLog.IsEnabled())
984 etwLog.DataflowBlockCompleted(_owningSource);
989 /// <summary>Gets the object to display in the debugger display attribute.</summary>
990 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
991 private object DebuggerDisplayContent
995 var displaySource = _owningSource as IDebuggerDisplay;
996 return string.Format("Block=\"{0}\"",
997 displaySource != null ? displaySource.Content : _owningSource);
1001 /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
1002 /// <returns>Debugging information about this source core.</returns>
1003 internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
1005 /// <summary>Provides debugging information about the source core.</summary>
1006 internal sealed class DebuggingInformation
1008 /// <summary>The source being viewed.</summary>
1009 private SourceCore<TOutput> _source;
1011 /// <summary>Initializes the type proxy.</summary>
1012 /// <param name="source">The source being viewed.</param>
1013 internal DebuggingInformation(SourceCore<TOutput> source) { _source = source; }
1015 /// <summary>Gets the number of messages available for receiving.</summary>
1016 internal int OutputCount { get { return _source._messages.Count; } }
1017 /// <summary>Gets the messages available for receiving.</summary>
1018 internal IEnumerable<TOutput> OutputQueue { get { return _source._messages.ToList(); } }
1019 /// <summary>Gets the task being used for output processing.</summary>
1020 internal Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }
1022 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1023 internal DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }
1024 /// <summary>Gets whether the block is declining further messages.</summary>
1025 internal bool IsDecliningPermanently { get { return _source._decliningPermanently; } }
1026 /// <summary>Gets whether the block is completed.</summary>
1027 internal bool IsCompleted { get { return _source.Completion.IsCompleted; } }
1029 /// <summary>Gets the set of all targets linked from this block.</summary>
1030 internal TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }
1031 /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
1032 internal ITargetBlock<TOutput> NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }