1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // A propagator that broadcasts incoming messages to all targets, overwriting the current
10 // message in the process.
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
19 using System.Security;
20 using System.Threading.Tasks.Dataflow.Internal;
22 namespace System.Threading.Tasks.Dataflow
25 /// Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.
26 /// Messages are broadcast to all linked targets, all of which may consume a clone of the message.
28 /// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
30 /// <see cref="BroadcastBlock{T}"/> exposes at most one element at a time. However, unlike
31 /// <see cref="WriteOnceBlock{T}"/>, that element will be overwritten as new elements are provided
32 /// to the block. <see cref="BroadcastBlock{T}"/> ensures that the current element is broadcast to any
33 /// linked targets before allowing the element to be overwritten.
35 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
36 [DebuggerTypeProxy(typeof(BroadcastBlock<>.DebugView))]
37 public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
39 /// <summary>The source side.</summary>
40 private readonly BroadcastingSourceCore<T> _source;
41 /// <summary>Bounding state for when the block is executing in bounded mode.</summary>
42 private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
43 /// <summary>Whether all future messages should be declined.</summary>
44 private bool _decliningPermanently;
45 /// <summary>A task has reserved the right to run the completion routine.</summary>
46 private bool _completionReserved;
47 /// <summary>Gets the lock used to synchronize incoming requests.</summary>
48 private object IncomingLock { get { return _source; } }
50 /// <summary>Initializes the <see cref="BroadcastBlock{T}"/> with the specified cloning function.</summary>
51 /// <param name="cloningFunction">
52 /// The function to use to clone the data when offered to other blocks.
53 /// This may be null to indicate that no cloning need be performed.
55 public BroadcastBlock(Func<T, T> cloningFunction) :
56 this(cloningFunction, DataflowBlockOptions.Default)
59 /// <summary>Initializes the <see cref="BroadcastBlock{T}"/> with the specified cloning function and <see cref="DataflowBlockOptions"/>.</summary>
60 /// <param name="cloningFunction">
61 /// The function to use to clone the data when offered to other blocks.
62 /// This may be null to indicate that no cloning need be performed.
64 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BroadcastBlock{T}"/>.</param>
65 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
66 public BroadcastBlock(Func<T, T> cloningFunction, DataflowBlockOptions dataflowBlockOptions)
69 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
70 Contract.EndContractBlock();
72 // Ensure we have options that can't be changed by the caller
73 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
75 // Initialize bounding state if necessary
76 Action<int> onItemsRemoved = null;
77 if (dataflowBlockOptions.BoundedCapacity > 0)
79 Debug.Assert(dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor");
80 onItemsRemoved = OnItemsRemoved;
81 _boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
84 // Initialize the source side
85 _source = new BroadcastingSourceCore<T>(this, cloningFunction, dataflowBlockOptions, onItemsRemoved);
87 // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
88 // In those cases we need to fault the target half to drop its buffered messages and to release its
89 // reservations. This should not create an infinite loop, because all our implementations are designed
90 // to handle multiple completion requests and to carry over only one.
91 _source.Completion.ContinueWith((completed, state) =>
93 var thisBlock = ((BroadcastBlock<T>)state) as IDataflowBlock;
94 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
95 thisBlock.Fault(completed.Exception);
96 }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
98 // Handle async cancellation requests by declining on the target
99 Common.WireCancellationToComplete(
100 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BroadcastBlock<T>)state).Complete(), this);
102 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
103 if (etwLog.IsEnabled())
105 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
110 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
111 public void Complete()
113 CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false);
116 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
117 void IDataflowBlock.Fault(Exception exception)
119 if (exception == null) throw new ArgumentNullException("exception");
120 Contract.EndContractBlock();
122 CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
125 internal void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
127 Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
128 "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
129 Contract.EndContractBlock();
133 // Faulting from outside is allowed until we start declining permanently.
134 // Faulting from inside is allowed at any time.
135 if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting))
137 _source.AddException(exception);
140 // Revert the dirty processing state if requested
141 if (revertProcessingState)
143 Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
144 "The processing state must be dirty when revertProcessingState==true.");
145 _boundingState.TaskForInputProcessing = null;
148 // Trigger completion if possible
149 _decliningPermanently = true;
150 CompleteTargetIfPossible();
154 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
155 public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
157 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
158 public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
160 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
161 Boolean IReceivableSourceBlock<T>.TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
163 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
164 public Task Completion { get { return _source.Completion; } }
166 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
167 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
169 // Validate arguments
170 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
171 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
172 Contract.EndContractBlock();
176 // If we've already stopped accepting messages, decline permanently
177 if (_decliningPermanently)
179 CompleteTargetIfPossible();
180 return DataflowMessageStatus.DecliningPermanently;
183 // We can directly accept the message if:
184 // 1) we are not bounding, OR
185 // 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
186 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
187 // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
188 // accepting new ones directly into the queue.)
189 if (_boundingState == null
191 (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
193 // Consume the message from the source if necessary
196 Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
199 messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
200 if (!consumed) return DataflowMessageStatus.NotAvailable;
203 // Once consumed, pass it to the delegate
204 _source.AddMessage(messageValue);
205 if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
206 return DataflowMessageStatus.Accepted;
208 // Otherwise, we try to postpone if a source was provided
209 else if (source != null)
211 Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
212 "PostponedMessages must have been initialized during construction in bounding mode.");
214 _boundingState.PostponedMessages.Push(source, messageHeader);
215 return DataflowMessageStatus.Postponed;
217 // We can't do anything else about this message
218 return DataflowMessageStatus.Declined;
222 /// <summary>Notifies the block that one or more items was removed from the queue.</summary>
223 /// <param name="numItemsRemoved">The number of items removed.</param>
224 private void OnItemsRemoved(int numItemsRemoved)
226 Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed.");
227 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
229 // If we're bounding, we need to know when an item is removed so that we
230 // can update the count that's mirroring the actual count in the source's queue,
231 // and potentially kick off processing to start consuming postponed messages.
232 if (_boundingState != null)
236 // Decrement the count, which mirrors the count in the source half
237 Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
238 "It should be impossible to have a negative number of items.");
239 _boundingState.CurrentCount -= numItemsRemoved;
241 ConsumeAsyncIfNecessary();
242 CompleteTargetIfPossible();
247 /// <summary>Called when postponed messages may need to be consumed.</summary>
248 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
249 internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
251 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
252 Debug.Assert(_boundingState != null, "Must be in bounded mode.");
254 if (!_decliningPermanently &&
255 _boundingState.TaskForInputProcessing == null &&
256 _boundingState.PostponedMessages.Count > 0 &&
257 _boundingState.CountIsLessThanBound)
259 // Create task and store into _taskForInputProcessing prior to scheduling the task
260 // so that _taskForInputProcessing will be visibly set in the task loop.
261 _boundingState.TaskForInputProcessing =
262 new Task(state => ((BroadcastBlock<T>)state).ConsumeMessagesLoopCore(), this,
263 Common.GetCreationOptionsForTask(isReplacementReplica));
266 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
267 if (etwLog.IsEnabled())
269 etwLog.TaskLaunchedForMessageHandling(
270 this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
271 _boundingState.PostponedMessages.Count);
275 // Start the task handling scheduling exceptions
276 Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
277 if (exception != null)
279 // Get out from under currently held locks. Complete re-acquires the locks it needs.
280 Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
281 exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
286 /// <summary>Task body used to consume postponed messages.</summary>
287 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
288 private void ConsumeMessagesLoopCore()
290 Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
291 "May only be called in bounded mode and when a task is in flight.");
292 Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
293 "This must only be called from the in-flight processing task.");
294 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
298 int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
300 i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
304 catch (Exception exception)
306 // Prevent the creation of new processing tasks
307 CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true);
313 // We're no longer processing, so null out the processing task
314 _boundingState.TaskForInputProcessing = null;
316 // However, we may have given up early because we hit our own configured
317 // processing limits rather than because we ran out of work to do. If that's
318 // the case, make sure we spin up another task to keep going.
319 ConsumeAsyncIfNecessary(isReplacementReplica: true);
321 // If, however, we stopped because we ran out of work to do and we
322 // know we'll never get more, then complete.
323 CompleteTargetIfPossible();
329 /// Retrieves one postponed message if there's room and if we can consume a postponed message.
330 /// Stores any consumed message into the source half.
332 /// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
333 /// <remarks>This must only be called from the asynchronous processing loop.</remarks>
334 private bool ConsumeAndStoreOneMessageIfAvailable()
336 Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
337 "May only be called in bounded mode and when a task is in flight.");
338 Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
339 "This must only be called from the in-flight processing task.");
340 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
342 // Loop through the postponed messages until we get one.
345 // Get the next item to retrieve. If there are no more, bail.
346 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
349 if (!_boundingState.CountIsLessThanBound) return false;
350 if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
352 // Optimistically assume we're going to get the item. This avoids taking the lock
353 // again if we're right. If we're wrong, we decrement it later under lock.
354 _boundingState.CurrentCount++;
358 bool consumed = false;
361 T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
364 _source.AddMessage(consumedValue);
370 // We didn't get the item, so decrement the count to counteract our optimistic assumption.
373 lock (IncomingLock) _boundingState.CurrentCount--;
379 /// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
380 private void CompleteTargetIfPossible()
382 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
383 if (_decliningPermanently &&
384 !_completionReserved &&
385 (_boundingState == null || _boundingState.TaskForInputProcessing == null))
387 _completionReserved = true;
389 // If we're in bounding mode and we have any postponed messages, we need to clear them,
390 // which means calling back to the source, which means we need to escape the incoming lock.
391 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
393 Task.Factory.StartNew(state =>
395 var thisBroadcastBlock = (BroadcastBlock<T>)state;
397 // Release any postponed messages
398 List<Exception> exceptions = null;
399 if (thisBroadcastBlock._boundingState != null)
401 // Note: No locks should be held at this point
402 Common.ReleaseAllPostponedMessages(thisBroadcastBlock,
403 thisBroadcastBlock._boundingState.PostponedMessages,
407 if (exceptions != null)
409 // It is important to migrate these exceptions to the source part of the owning batch,
410 // because that is the completion task that is publically exposed.
411 thisBroadcastBlock._source.AddExceptions(exceptions);
414 thisBroadcastBlock._source.Complete();
415 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
417 // Otherwise, we can just decline the source directly.
425 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
426 T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
428 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
431 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
432 bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
434 return _source.ReserveMessage(messageHeader, target);
437 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
438 void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
440 _source.ReleaseReservation(messageHeader, target);
443 /// <summary>Gets a value to be used for the DebuggerDisplayAttribute. This must not throw even if HasValue is false.</summary>
444 private bool HasValueForDebugger { get { return _source.GetDebuggingInformation().HasValue; } }
445 /// <summary>Gets a value to be used for the DebuggerDisplayAttribute. This must not throw even if HasValue is false.</summary>
446 private T ValueForDebugger { get { return _source.GetDebuggingInformation().Value; } }
448 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
449 public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
451 /// <summary>The data to display in the debugger display attribute.</summary>
452 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
453 private object DebuggerDisplayContent
457 return string.Format("{0}, HasValue={1}, Value={2}",
458 Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
463 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
464 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
466 /// <summary>Provides a debugger type proxy for the BroadcastBlock.</summary>
467 private sealed class DebugView
469 /// <summary>The BroadcastBlock being debugged.</summary>
470 private readonly BroadcastBlock<T> _broadcastBlock;
471 /// <summary>Debug info about the source side of the broadcast.</summary>
472 private readonly BroadcastingSourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
474 /// <summary>Initializes the debug view.</summary>
475 /// <param name="broadcastBlock">The BroadcastBlock being debugged.</param>
476 public DebugView(BroadcastBlock<T> broadcastBlock)
478 Contract.Requires(broadcastBlock != null, "Need a block with which to construct the debug view.");
479 _broadcastBlock = broadcastBlock;
480 _sourceDebuggingInformation = broadcastBlock._source.GetDebuggingInformation();
483 /// <summary>Gets the messages waiting to be processed.</summary>
484 public IEnumerable<T> InputQueue { get { return _sourceDebuggingInformation.InputQueue; } }
485 /// <summary>Gets whether the broadcast has a current value.</summary>
486 public bool HasValue { get { return _broadcastBlock.HasValueForDebugger; } }
487 /// <summary>Gets the broadcast's current value.</summary>
488 public T Value { get { return _broadcastBlock.ValueForDebugger; } }
490 /// <summary>Gets the task being used for output processing.</summary>
491 public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
493 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
494 public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
495 /// <summary>Gets whether the block is declining further messages.</summary>
496 public bool IsDecliningPermanently { get { return _broadcastBlock._decliningPermanently; } }
497 /// <summary>Gets whether the block is completed.</summary>
498 public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
499 /// <summary>Gets the block's Id.</summary>
500 public int Id { get { return Common.GetBlockId(_broadcastBlock); } }
502 /// <summary>Gets the set of all targets linked from this block.</summary>
503 public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
504 /// <summary>Gets the set of all targets linked from this block.</summary>
505 public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
508 /// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
509 /// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
510 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
511 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
512 private sealed class BroadcastingSourceCore<TOutput>
514 /// <summary>A registry used to store all linked targets and information about them.</summary>
515 private readonly TargetRegistry<TOutput> _targetRegistry;
516 /// <summary>All of the output messages queued up to be received by consumers/targets.</summary>
517 private readonly Queue<TOutput> _messages = new Queue<TOutput>();
518 /// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
519 private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
521 /// An action to be invoked on the owner block when an item is removed.
522 /// This may be null if the owner block doesn't need to be notified.
524 private readonly Action<int> _itemsRemovedAction;
526 /// <summary>Gets the object to use as the outgoing lock.</summary>
527 private object OutgoingLock { get { return _completionTask; } }
528 /// <summary>Gets the object to use as the value lock.</summary>
529 private object ValueLock { get { return _targetRegistry; } }
531 /// <summary>The source utilize this helper.</summary>
532 private readonly BroadcastBlock<TOutput> _owningSource;
533 /// <summary>The options used to configure this block's execution.</summary>
534 private readonly DataflowBlockOptions _dataflowBlockOptions;
535 /// <summary>The cloning function to use.</summary>
536 private readonly Func<TOutput, TOutput> _cloningFunction;
538 /// <summary>An indicator whether _currentMessage has a value.</summary>
539 private bool _currentMessageIsValid;
540 /// <summary>The message currently being broadcast.</summary>
541 private TOutput _currentMessage;
542 /// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
543 private ITargetBlock<TOutput> _nextMessageReservedFor;
544 /// <summary>Whether this block should again attempt to offer messages to targets.</summary>
545 private bool _enableOffering;
546 /// <summary>Whether all future messages should be declined.</summary>
547 private bool _decliningPermanently;
548 /// <summary>The task used to process the output and offer it to targets.</summary>
549 private Task _taskForOutputProcessing;
550 /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
551 private List<Exception> _exceptions;
552 /// <summary>Counter for message IDs unique within this source block.</summary>
553 private long _nextMessageId = 1; // We are going to use this value before incrementing.
554 /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
555 private bool _completionReserved;
557 /// <summary>Initializes the source core.</summary>
558 /// <param name="owningSource">The source utilizing this core.</param>
559 /// <param name="cloningFunction">The function to use to clone the data when offered to other blocks. May be null.</param>
560 /// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
561 /// <param name="itemsRemovedAction">Action to invoke when an item is removed.</param>
562 internal BroadcastingSourceCore(
563 BroadcastBlock<TOutput> owningSource,
564 Func<TOutput, TOutput> cloningFunction,
565 DataflowBlockOptions dataflowBlockOptions,
566 Action<int> itemsRemovedAction)
568 Contract.Requires(owningSource != null, "Must be associated with a broadcast block.");
569 Contract.Requires(dataflowBlockOptions != null, "Options are required to configure this block.");
571 // Store the arguments
572 _owningSource = owningSource;
573 _cloningFunction = cloningFunction;
574 _dataflowBlockOptions = dataflowBlockOptions;
575 _itemsRemovedAction = itemsRemovedAction;
577 // Construct members that depend on the arguments
578 _targetRegistry = new TargetRegistry<TOutput>(_owningSource);
581 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
582 internal Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
584 // Take the lock only long enough to get the message,
585 // synchronizing with other activities on the block.
586 // We don't want to execute the user-provided cloning delegate
587 // while holding the lock.
594 message = _currentMessage;
595 isValid = _currentMessageIsValid;
599 // Clone and hand back a message if we have one and if it passes the filter.
600 // (A null filter means all messages pass.)
601 if (isValid && (filter == null || filter(message)))
603 item = CloneItem(message);
608 item = default(TOutput);
613 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
614 internal Boolean TryReceiveAll(out IList<TOutput> items)
616 // Try to receive the one item this block may have.
617 // If we can, give back an array of one item. Otherwise, give back null.
619 if (TryReceive(null, out item))
621 items = new TOutput[] { item };
631 /// <summary>Adds a message to the source block for propagation.</summary>
632 /// <param name="item">The item to be wrapped in a message to be added.</param>
633 internal void AddMessage(TOutput item)
635 // This method must not take the outgoing lock, as it will be called in situations
636 // where a derived type's incoming lock is held. The lock leveling structure
637 // we're employing is such that outgoing may be held while acquiring incoming, but
638 // of course not the other way around. This is the reason why DataflowSourceBlock
639 // needs ValueLock as well. Otherwise, it would be pure overhead.
642 if (_decliningPermanently) return;
643 _messages.Enqueue(item);
644 if (_messages.Count == 1) _enableOffering = true;
645 OfferAsyncIfNecessary();
649 /// <summary>Informs the block that it will not be receiving additional messages.</summary>
650 internal void Complete()
654 _decliningPermanently = true;
656 // Complete may be called in a context where an incoming lock is held. We need to
657 // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
658 // However, now that _decliningPermanently has been set, the timing of
659 // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
660 // and take the necessary locks in a situation where we're sure it won't cause a problem.
661 Task.Factory.StartNew(state =>
663 var thisSourceCore = (BroadcastingSourceCore<TOutput>)state;
664 lock (thisSourceCore.OutgoingLock)
666 lock (thisSourceCore.ValueLock)
668 thisSourceCore.CompleteBlockIfPossible();
671 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
675 /// <summary>Clones the item.</summary>
676 /// <param name="item">The item to clone.</param>
677 /// <returns>The cloned item.</returns>
678 private TOutput CloneItem(TOutput item)
680 return _cloningFunction != null ?
681 _cloningFunction(item) :
685 /// <summary>Offers the current message to a specific target.</summary>
686 /// <param name="target">The target to which to offer the current message.</param>
687 private void OfferCurrentMessageToNewTarget(ITargetBlock<TOutput> target)
689 Contract.Requires(target != null, "Target required to offer messages to.");
690 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
691 Common.ContractAssertMonitorStatus(ValueLock, held: false);
693 // Get the current message if there is one
694 TOutput currentMessage;
698 currentMessage = _currentMessage;
699 isValid = _currentMessageIsValid;
702 // If there is no valid message yet, there is nothing to offer
703 if (!isValid) return;
705 // Offer it to the target.
706 // We must not increment the message ID here. We only do that when we populate _currentMessage, i.e. when we dequeue.
707 bool useCloning = _cloningFunction != null;
708 DataflowMessageStatus result = target.OfferMessage(new DataflowMessageHeader(_nextMessageId), currentMessage, _owningSource, consumeToAccept: useCloning);
710 // If accepted and the target was linked as "unlinkAfterOne", remove it
711 if (result == DataflowMessageStatus.Accepted)
715 // If accepted and the target was linked as "once", mark it for removal.
716 // If we were forcing consumption, this removal would have already
717 // happened in ConsumeMessage.
718 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
721 // If declined permanently, remove it
722 else if (result == DataflowMessageStatus.DecliningPermanently)
724 _targetRegistry.Remove(target);
726 else Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages from a Broadcast should never be missed.");
729 /// <summary>Offers messages to targets.</summary>
730 private bool OfferToTargets()
732 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
733 Common.ContractAssertMonitorStatus(ValueLock, held: false);
735 DataflowMessageHeader header = default(DataflowMessageHeader);
736 TOutput message = default(TOutput);
737 int numDequeuedMessages = 0;
740 // If there's a reservation or there aren't any more messages,
741 // there's nothing for us to do. If there's no reservation
742 // and a message is available, dequeue the next one and store it
743 // as the new current. If we're now at 0 message, disable further
744 // propagation until more messages arrive.
745 if (_nextMessageReservedFor == null && _messages.Count > 0)
747 // If there are no targets registered, we might as well empty out the broadcast,
748 // keeping just the last. Otherwise, it'll happen anyway, but much more expensively.
749 if (_targetRegistry.FirstTargetNode == null)
751 while (_messages.Count > 1)
754 numDequeuedMessages++;
758 // Get the next message to offer
759 Debug.Assert(_messages.Count > 0, "There must be at least one message to dequeue.");
760 _currentMessage = message = _messages.Dequeue();
761 numDequeuedMessages++;
762 _currentMessageIsValid = true;
763 header = new DataflowMessageHeader(++_nextMessageId);
764 if (_messages.Count == 0) _enableOffering = false;
768 _enableOffering = false;
771 } // must not hold ValueLock when calling out to targets
776 // Notify the owner block that our count has decreased
777 if (_itemsRemovedAction != null) _itemsRemovedAction(numDequeuedMessages);
779 // Offer it to each target, unless a soleTarget was provided, which case just offer it to that one.
780 TargetRegistry<TOutput>.LinkedTargetInfo cur = _targetRegistry.FirstTargetNode;
783 // Note that during OfferMessage, a target may call ConsumeMessage, which may unlink the target
784 // if the target is registered as "once". Doing so will remove the target from the targets list.
785 // As such, we avoid using an enumerator over _targetRegistry and instead walk from back to front,
786 // so that if an element is removed, it won't affect the rest of our walk.
787 TargetRegistry<TOutput>.LinkedTargetInfo next = cur.Next;
788 ITargetBlock<TOutput> target = cur.Target;
789 OfferMessageToTarget(header, message, target);
796 /// <summary>Offers the specified message to the specified target.</summary>
797 /// <param name="header">The header of the message to offer.</param>
798 /// <param name="message">The message to offer.</param>
799 /// <param name="target">The target to which the message should be offered.</param>
801 /// This will remove the target from the target registry if the result of the propagation demands it.
803 private void OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target)
805 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
806 Common.ContractAssertMonitorStatus(ValueLock, held: false);
808 // Offer the message. If there's a cloning function, we force the target to
809 // come back to us to consume the message, allowing us the opportunity to run
810 // the cloning function once we know they want the data. If there is no cloning
811 // function, there's no reason for them to call back here.
812 bool useCloning = _cloningFunction != null;
813 switch (target.OfferMessage(header, message, _owningSource, consumeToAccept: useCloning))
815 case DataflowMessageStatus.Accepted:
818 // If accepted and the target was linked as "once", mark it for removal.
819 // If we were forcing consumption, this removal would have already
820 // happened in ConsumeMessage.
821 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
825 case DataflowMessageStatus.DecliningPermanently:
826 // If declined permanently, mark the target for removal
827 _targetRegistry.Remove(target);
830 case DataflowMessageStatus.NotAvailable:
831 Debug.Assert(false, "Messages from a Broadcast should never be missed.");
833 // No action required for Postponed or Declined
837 /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
838 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
839 private void OfferAsyncIfNecessary(bool isReplacementReplica = false)
841 Common.ContractAssertMonitorStatus(ValueLock, held: true);
842 // This method must not take the OutgoingLock.
844 bool currentlyProcessing = _taskForOutputProcessing != null;
845 bool processingToDo = _enableOffering && _messages.Count > 0;
847 // If there's any work to be done...
848 if (!currentlyProcessing && processingToDo && !CanceledOrFaulted)
850 // Create task and store into _taskForOutputProcessing prior to scheduling the task
851 // so that _taskForOutputProcessing will be visibly set in the task loop.
852 _taskForOutputProcessing = new Task(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore(), this,
853 Common.GetCreationOptionsForTask(isReplacementReplica));
856 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
857 if (etwLog.IsEnabled())
859 etwLog.TaskLaunchedForMessageHandling(
860 _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
864 // Start the task handling scheduling exceptions
865 Exception exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
866 if (exception != null)
868 // First, log the exception while the processing state is dirty which is preventing the block from completing.
869 // Then revert the proactive processing state changes.
870 // And last, try to complete the block.
871 AddException(exception);
872 _decliningPermanently = true;
873 _taskForOutputProcessing = null;
875 // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
876 // Re-take the locks on a separate thread.
877 Task.Factory.StartNew(state =>
879 var thisSourceCore = (BroadcastingSourceCore<TOutput>)state;
880 lock (thisSourceCore.OutgoingLock)
882 lock (thisSourceCore.ValueLock)
884 thisSourceCore.CompleteBlockIfPossible();
887 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
892 /// <summary>Task body used to process messages.</summary>
893 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
894 private void OfferMessagesLoopCore()
898 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
901 // Offer as many messages as we can
902 for (int counter = 0;
903 counter < maxMessagesPerTask && !CanceledOrFaulted;
906 if (!OfferToTargets()) break;
910 catch (Exception exception)
912 _owningSource.CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true);
920 // We're no longer processing, so null out the processing task
921 _taskForOutputProcessing = null;
923 // However, we may have given up early because we hit our own configured
924 // processing limits rather than because we ran out of work to do. If that's
925 // the case, make sure we spin up another task to keep going.
926 OfferAsyncIfNecessary(isReplacementReplica: true);
928 // If, however, we stopped because we ran out of work to do and we
929 // know we'll never get more, then complete.
930 CompleteBlockIfPossible();
936 /// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
937 private void CompleteBlockIfPossible()
939 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
940 Common.ContractAssertMonitorStatus(ValueLock, held: true);
942 if (!_completionReserved)
944 bool currentlyProcessing = _taskForOutputProcessing != null;
945 bool noMoreMessages = _decliningPermanently && _messages.Count == 0;
947 // Are we done forever?
948 bool complete = !currentlyProcessing && (noMoreMessages || CanceledOrFaulted);
951 CompleteBlockIfPossible_Slow();
957 /// Slow path for CompleteBlockIfPossible.
958 /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
960 private void CompleteBlockIfPossible_Slow()
962 Contract.Requires(_taskForOutputProcessing == null, "There must be no processing tasks.");
964 (_decliningPermanently && _messages.Count == 0) || CanceledOrFaulted,
965 "There must be no more messages or the block must be canceled or faulted.");
967 _completionReserved = true;
969 // Run asynchronously to get out of the currently held locks
970 Task.Factory.StartNew(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore).CompleteBlockOncePossible(),
971 this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
975 /// Completes the block. This must only be called once, and only once all of the completion conditions are met.
976 /// As such, it must only be called from CompleteBlockIfPossible.
978 private void CompleteBlockOncePossible()
980 TargetRegistry<TOutput>.LinkedTargetInfo linkedTargets;
981 List<Exception> exceptions;
983 // Clear out the target registry and buffers to help avoid memory leaks.
984 // We do not clear _currentMessage, which should remain as that message forever.
987 // Save the linked list of targets so that it could be traversed later to propagate completion
988 linkedTargets = _targetRegistry.ClearEntryPoints();
993 // Save a local reference to the exceptions list and null out the field,
994 // so that if the target side tries to add an exception this late,
995 // it will go to a separate list (that will be ignored.)
996 exceptions = _exceptions;
1001 // If it's due to an exception, finish in a faulted state
1002 if (exceptions != null)
1004 _completionTask.TrySetException(exceptions);
1006 // It's due to cancellation, finish in a canceled state
1007 else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
1009 _completionTask.TrySetCanceled();
1011 // Otherwise, finish in a successful state.
1014 _completionTask.TrySetResult(default(VoidResult));
1017 // Now that the completion task is completed, we may propagate completion to the linked targets
1018 _targetRegistry.PropagateCompletion(linkedTargets);
1020 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
1021 if (etwLog.IsEnabled())
1023 etwLog.DataflowBlockCompleted(_owningSource);
1028 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
1029 [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
1030 internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
1032 // Validate arguments
1033 if (target == null) throw new ArgumentNullException("target");
1034 if (linkOptions == null) throw new ArgumentNullException("linkOptions");
1035 Contract.EndContractBlock();
1039 // If we've completed or completion has at least started, offer the message to this target,
1040 // and propagate completion if that was requested.
1041 // Then there's nothing more to be done.
1042 if (_completionReserved)
1044 OfferCurrentMessageToNewTarget(target);
1045 if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
1046 return Disposables.Nop;
1049 // Otherwise, add the target and then offer it the current
1050 // message. We do this in this order because offering may
1051 // cause the target to be removed if it's unlinkAfterOne,
1052 // and in the reverse order we would end up adding the target
1053 // after it was "removed".
1054 _targetRegistry.Add(ref target, linkOptions);
1055 OfferCurrentMessageToNewTarget(target);
1056 return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
1060 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
1061 internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
1063 // Validate arguments
1064 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1065 if (target == null) throw new ArgumentNullException("target");
1066 Contract.EndContractBlock();
1068 TOutput valueToClone;
1069 lock (OutgoingLock) // We may currently be calling out under this lock to the target; requires it to be reentrant
1073 // If this isn't the next message to be served up, bail
1074 if (messageHeader.Id != _nextMessageId)
1076 messageConsumed = false;
1077 return default(TOutput);
1080 // If the caller has the reservation, release the reservation.
1081 // We still allow others to take the message if there's a reservation.
1082 if (_nextMessageReservedFor == target)
1084 _nextMessageReservedFor = null;
1085 _enableOffering = true;
1087 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
1089 OfferAsyncIfNecessary();
1090 CompleteBlockIfPossible();
1092 // Return a clone of the consumed message.
1093 valueToClone = _currentMessage;
1097 messageConsumed = true;
1098 return CloneItem(valueToClone);
1101 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
1102 internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1104 // Validate arguments
1105 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1106 if (target == null) throw new ArgumentNullException("target");
1107 Contract.EndContractBlock();
1111 // If no one currently holds a reservation...
1112 if (_nextMessageReservedFor == null)
1116 // ...and the requested message is next in line, allow it
1117 if (messageHeader.Id == _nextMessageId)
1119 _nextMessageReservedFor = target;
1120 _enableOffering = false;
1129 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
1130 internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1132 // Validate arguments
1133 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1134 if (target == null) throw new ArgumentNullException("target");
1135 Contract.EndContractBlock();
1139 // If someone else holds the reservation, bail.
1140 if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
1142 TOutput messageToReoffer;
1145 // If this is not the message at the head of the queue, bail
1146 if (messageHeader.Id != _nextMessageId) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
1148 // Otherwise, release the reservation, and reoffer the message to all targets.
1149 _nextMessageReservedFor = null;
1150 _enableOffering = true;
1151 messageToReoffer = _currentMessage;
1152 OfferAsyncIfNecessary();
1155 // We need to explicitly reoffer this message to the releaser,
1156 // as otherwise if the target has join behavior it could end up waiting for an offer from
1157 // this broadcast forever, even though data is in fact available. We could only
1158 // do this if _messages.Count == 0, as if it's > 0 the message will get overwritten
1159 // as part of the asynchronous offering, but for consistency we should always reoffer
1160 // the current message.
1161 OfferMessageToTarget(messageHeader, messageToReoffer, target);
1165 /// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
1166 private bool CanceledOrFaulted
1170 // Cancellation is honored as soon as the CancellationToken has been signaled.
1171 // Faulting is honored after an exception has been encountered and the owning block
1172 // has invoked Complete on us.
1173 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
1174 (Volatile.Read(ref _exceptions) != null && _decliningPermanently);
1178 /// <summary>Adds an individual exceptionto this source.</summary>
1179 /// <param name="exception">The exception to add</param>
1180 internal void AddException(Exception exception)
1182 Contract.Requires(exception != null, "An exception to add is required.");
1183 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
1186 Common.AddException(ref _exceptions, exception);
1190 /// <summary>Adds exceptions to this source.</summary>
1191 /// <param name="exceptions">The exceptions to add</param>
1192 internal void AddExceptions(List<Exception> exceptions)
1194 Contract.Requires(exceptions != null, "A list of exceptions to add is required.");
1195 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
1198 foreach (Exception exception in exceptions)
1200 Common.AddException(ref _exceptions, exception);
1205 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1206 internal Task Completion { get { return _completionTask.Task; } }
1208 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1209 internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
1211 /// <summary>Gets the object to display in the debugger display attribute.</summary>
1212 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1213 private object DebuggerDisplayContent
1217 var displaySource = _owningSource as IDebuggerDisplay;
1218 return string.Format("Block=\"{0}\"",
1219 displaySource != null ? displaySource.Content : _owningSource);
1223 /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
1224 /// <returns>Debugging information about this source core.</returns>
1225 internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
1227 /// <summary>Provides debugging information about the source core.</summary>
1228 internal sealed class DebuggingInformation
1230 /// <summary>The source being viewed.</summary>
1231 private BroadcastingSourceCore<TOutput> _source;
1233 /// <summary>Initializes the type proxy.</summary>
1234 /// <param name="source">The source being viewed.</param>
1235 public DebuggingInformation(BroadcastingSourceCore<TOutput> source) { _source = source; }
1237 /// <summary>Gets whether the source contains a current message.</summary>
1238 public bool HasValue { get { return _source._currentMessageIsValid; } }
1239 /// <summary>Gets the value of the source's current message.</summary>
1240 public TOutput Value { get { return _source._currentMessage; } }
1241 /// <summary>Gets the number of messages waiting to be made current.</summary>
1242 public int InputCount { get { return _source._messages.Count; } }
1243 /// <summary>Gets the messages available for receiving.</summary>
1244 public IEnumerable<TOutput> InputQueue { get { return _source._messages.ToList(); } }
1245 /// <summary>Gets the task being used for output processing.</summary>
1246 public Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }
1248 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1249 public DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }
1250 /// <summary>Gets whether the block is declining further messages.</summary>
1251 public bool IsDecliningPermanently { get { return _source._decliningPermanently; } }
1252 /// <summary>Gets whether the block is completed.</summary>
1253 public bool IsCompleted { get { return _source.Completion.IsCompleted; } }
1255 /// <summary>Gets the set of all targets linked from this block.</summary>
1256 public TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }
1257 /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
1258 public ITargetBlock<TOutput> NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }