1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // A propagator block that provides support for unbounded and bounded FIFO buffers.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.Contracts;
16 using System.Security;
17 using System.Threading.Tasks.Dataflow.Internal;
18 using System.Diagnostics.CodeAnalysis;
20 namespace System.Threading.Tasks.Dataflow
22 /// <summary>Provides a buffer for storing data.</summary>
23 /// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
24 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
25 [DebuggerTypeProxy(typeof(BufferBlock<>.DebugView))]
26 public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
28 /// <summary>The core logic for the buffer block.</summary>
29 private readonly SourceCore<T> _source;
30 /// <summary>The bounding state for when in bounding mode; null if not bounding.</summary>
31 private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
32 /// <summary>Whether all future messages should be declined on the target.</summary>
33 private bool _targetDecliningPermanently;
34 /// <summary>A task has reserved the right to run the target's completion routine.</summary>
35 private bool _targetCompletionReserved;
36 /// <summary>Gets the lock object used to synchronize incoming requests.</summary>
37 private object IncomingLock { get { return _source; } }
39 /// <summary>Initializes the <see cref="BufferBlock{T}"/>.</summary>
40 public BufferBlock() :
41 this(DataflowBlockOptions.Default)
44 /// <summary>Initializes the <see cref="BufferBlock{T}"/> with the specified <see cref="DataflowBlockOptions"/>.</summary>
45 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BufferBlock{T}"/>.</param>
46 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
47 public BufferBlock(DataflowBlockOptions dataflowBlockOptions)
49 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
50 Contract.EndContractBlock();
52 // Ensure we have options that can't be changed by the caller
53 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
55 // Initialize bounding state if necessary
56 Action<ISourceBlock<T>, int> onItemsRemoved = null;
57 if (dataflowBlockOptions.BoundedCapacity > 0)
59 onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
60 _boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
63 // Initialize the source state
64 _source = new SourceCore<T>(this, dataflowBlockOptions,
65 owningSource => ((BufferBlock<T>)owningSource).Complete(),
68 // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
69 // In those cases we need to fault the target half to drop its buffered messages and to release its
70 // reservations. This should not create an infinite loop, because all our implementations are designed
71 // to handle multiple completion requests and to carry over only one.
72 _source.Completion.ContinueWith((completed, state) =>
74 var thisBlock = ((BufferBlock<T>)state) as IDataflowBlock;
75 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
76 thisBlock.Fault(completed.Exception);
77 }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
79 // Handle async cancellation requests by declining on the target
80 Common.WireCancellationToComplete(
81 dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource).Complete(), this);
83 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
84 if (etwLog.IsEnabled())
86 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
91 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
92 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
95 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
96 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
97 Contract.EndContractBlock();
101 // If we've already stopped accepting messages, decline permanently
102 if (_targetDecliningPermanently)
104 CompleteTargetIfPossible();
105 return DataflowMessageStatus.DecliningPermanently;
108 // We can directly accept the message if:
109 // 1) we are not bounding, OR
110 // 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
111 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
112 // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
113 // accepting new ones directly into the queue.)
114 if (_boundingState == null
116 (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
118 // Consume the message from the source if necessary
121 Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
124 messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
125 if (!consumed) return DataflowMessageStatus.NotAvailable;
128 // Once consumed, pass it to the source
129 _source.AddMessage(messageValue);
130 if (_boundingState != null) _boundingState.CurrentCount++;
132 return DataflowMessageStatus.Accepted;
134 // Otherwise, we try to postpone if a source was provided
135 else if (source != null)
137 Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
138 "PostponedMessages must have been initialized during construction in bounding mode.");
140 _boundingState.PostponedMessages.Push(source, messageHeader);
141 return DataflowMessageStatus.Postponed;
143 // We can't do anything else about this message
144 return DataflowMessageStatus.Declined;
148 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
149 public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); }
151 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
152 void IDataflowBlock.Fault(Exception exception)
154 if (exception == null) throw new ArgumentNullException("exception");
155 Contract.EndContractBlock();
157 CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
160 private void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
162 Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
163 "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
164 Contract.EndContractBlock();
168 // Faulting from outside is allowed until we start declining permanently.
169 // Faulting from inside is allowed at any time.
170 if (exception != null && (!_targetDecliningPermanently || storeExceptionEvenIfAlreadyCompleting))
172 _source.AddException(exception);
175 // Revert the dirty processing state if requested
176 if (revertProcessingState)
178 Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
179 "The processing state must be dirty when revertProcessingState==true.");
180 _boundingState.TaskForInputProcessing = null;
183 // Trigger completion
184 _targetDecliningPermanently = true;
185 CompleteTargetIfPossible();
189 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
190 public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
192 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
193 public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
195 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
196 public Boolean TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
198 /// <summary>Gets the number of items currently stored in the buffer.</summary>
199 public Int32 Count { get { return _source.OutputCount; } }
201 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
202 public Task Completion { get { return _source.Completion; } }
204 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
205 T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
207 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
210 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
211 bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
213 return _source.ReserveMessage(messageHeader, target);
216 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
217 void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
219 _source.ReleaseReservation(messageHeader, target);
222 /// <summary>Notifies the block that one or more items was removed from the queue.</summary>
223 /// <param name="numItemsRemoved">The number of items removed.</param>
224 private void OnItemsRemoved(int numItemsRemoved)
226 Contract.Requires(numItemsRemoved > 0, "A positive number of items to remove is required.");
227 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
229 // If we're bounding, we need to know when an item is removed so that we
230 // can update the count that's mirroring the actual count in the source's queue,
231 // and potentially kick off processing to start consuming postponed messages.
232 if (_boundingState != null)
236 // Decrement the count, which mirrors the count in the source half
237 Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
238 "It should be impossible to have a negative number of items.");
239 _boundingState.CurrentCount -= numItemsRemoved;
241 ConsumeAsyncIfNecessary();
242 CompleteTargetIfPossible();
247 /// <summary>Called when postponed messages may need to be consumed.</summary>
248 /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
249 internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
251 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
252 Debug.Assert(_boundingState != null, "Must be in bounded mode.");
254 if (!_targetDecliningPermanently &&
255 _boundingState.TaskForInputProcessing == null &&
256 _boundingState.PostponedMessages.Count > 0 &&
257 _boundingState.CountIsLessThanBound)
259 // Create task and store into _taskForInputProcessing prior to scheduling the task
260 // so that _taskForInputProcessing will be visibly set in the task loop.
261 _boundingState.TaskForInputProcessing =
262 new Task(state => ((BufferBlock<T>)state).ConsumeMessagesLoopCore(), this,
263 Common.GetCreationOptionsForTask(isReplacementReplica));
266 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
267 if (etwLog.IsEnabled())
269 etwLog.TaskLaunchedForMessageHandling(
270 this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
271 _boundingState.PostponedMessages.Count);
275 // Start the task handling scheduling exceptions
276 Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
277 if (exception != null)
279 // Get out from under currently held locks. CompleteCore re-acquires the locks it needs.
280 Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
281 exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
287 /// <summary>Task body used to consume postponed messages.</summary>
288 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
289 private void ConsumeMessagesLoopCore()
291 Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
292 "May only be called in bounded mode and when a task is in flight.");
293 Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
294 "This must only be called from the in-flight processing task.");
295 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
299 int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
301 i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
305 catch (Exception exc)
307 // Prevent the creation of new processing tasks
308 CompleteCore(exc, storeExceptionEvenIfAlreadyCompleting: true);
314 // We're no longer processing, so null out the processing task
315 _boundingState.TaskForInputProcessing = null;
317 // However, we may have given up early because we hit our own configured
318 // processing limits rather than because we ran out of work to do. If that's
319 // the case, make sure we spin up another task to keep going.
320 ConsumeAsyncIfNecessary(isReplacementReplica: true);
322 // If, however, we stopped because we ran out of work to do and we
323 // know we'll never get more, then complete.
324 CompleteTargetIfPossible();
330 /// Retrieves one postponed message if there's room and if we can consume a postponed message.
331 /// Stores any consumed message into the source half.
333 /// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
334 /// <remarks>This must only be called from the asynchronous processing loop.</remarks>
335 private bool ConsumeAndStoreOneMessageIfAvailable()
337 Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
338 "May only be called in bounded mode and when a task is in flight.");
339 Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
340 "This must only be called from the in-flight processing task.");
341 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
343 // Loop through the postponed messages until we get one.
346 // Get the next item to retrieve. If there are no more, bail.
347 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
350 if (!_boundingState.CountIsLessThanBound) return false;
351 if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
353 // Optimistically assume we're going to get the item. This avoids taking the lock
354 // again if we're right. If we're wrong, we decrement it later under lock.
355 _boundingState.CurrentCount++;
359 bool consumed = false;
362 T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
365 _source.AddMessage(consumedValue);
371 // We didn't get the item, so decrement the count to counteract our optimistic assumption.
374 lock (IncomingLock) _boundingState.CurrentCount--;
380 /// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
381 private void CompleteTargetIfPossible()
383 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
384 if (_targetDecliningPermanently &&
385 !_targetCompletionReserved &&
386 (_boundingState == null || _boundingState.TaskForInputProcessing == null))
388 _targetCompletionReserved = true;
390 // If we're in bounding mode and we have any postponed messages, we need to clear them,
391 // which means calling back to the source, which means we need to escape the incoming lock.
392 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
394 Task.Factory.StartNew(state =>
396 var thisBufferBlock = (BufferBlock<T>)state;
398 // Release any postponed messages
399 List<Exception> exceptions = null;
400 if (thisBufferBlock._boundingState != null)
402 // Note: No locks should be held at this point
403 Common.ReleaseAllPostponedMessages(thisBufferBlock,
404 thisBufferBlock._boundingState.PostponedMessages,
408 if (exceptions != null)
410 // It is important to migrate these exceptions to the source part of the owning batch,
411 // because that is the completion task that is publically exposed.
412 thisBufferBlock._source.AddExceptions(exceptions);
415 thisBufferBlock._source.Complete();
416 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
418 // Otherwise, we can just decline the source directly.
426 /// <summary>Gets the number of messages in the buffer. This must only be used from the debugger as it avoids taking necessary locks.</summary>
427 private int CountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
429 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
430 public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
432 /// <summary>The data to display in the debugger display attribute.</summary>
433 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
434 private object DebuggerDisplayContent
438 return string.Format("{0}, Count={1}",
439 Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
443 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
444 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
446 /// <summary>Provides a debugger type proxy for the BufferBlock.</summary>
447 private sealed class DebugView
449 /// <summary>The buffer block.</summary>
450 private readonly BufferBlock<T> _bufferBlock;
451 /// <summary>The buffer's source half.</summary>
452 private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
454 /// <summary>Initializes the debug view.</summary>
455 /// <param name="bufferBlock">The BufferBlock being viewed.</param>
456 public DebugView(BufferBlock<T> bufferBlock)
458 Contract.Requires(bufferBlock != null, "Need a block with which to construct the debug view.");
459 _bufferBlock = bufferBlock;
460 _sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
463 /// <summary>Gets the collection of postponed message headers.</summary>
464 public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages
466 get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.PostponedMessages : null; }
468 /// <summary>Gets the messages in the buffer.</summary>
469 public IEnumerable<T> Queue { get { return _sourceDebuggingInformation.OutputQueue; } }
471 /// <summary>The task used to process messages.</summary>
472 public Task TaskForInputProcessing { get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.TaskForInputProcessing : null; } }
473 /// <summary>Gets the task being used for output processing.</summary>
474 public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
476 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
477 public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
479 /// <summary>Gets whether the block is declining further messages.</summary>
480 public bool IsDecliningPermanently { get { return _bufferBlock._targetDecliningPermanently; } }
481 /// <summary>Gets whether the block is completed.</summary>
482 public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
483 /// <summary>Gets the block's Id.</summary>
484 public int Id { get { return Common.GetBlockId(_bufferBlock); } }
486 /// <summary>Gets the set of all targets linked from this block.</summary>
487 public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
488 /// <summary>Gets the set of all targets linked from this block.</summary>
489 public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }