Merge pull request #2236 from akoeplinger/add-dataflow
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Blocks / BufferBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // BufferBlock.cs
7 //
8 //
9 // A propagator block that provides support for unbounded and bounded FIFO buffers.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.Contracts;
16 using System.Security;
17 using System.Threading.Tasks.Dataflow.Internal;
18 using System.Diagnostics.CodeAnalysis;
19
20 namespace System.Threading.Tasks.Dataflow
21 {
22     /// <summary>Provides a buffer for storing data.</summary>
23     /// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
24     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
25     [DebuggerTypeProxy(typeof(BufferBlock<>.DebugView))]
26     public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
27     {
28         /// <summary>The core logic for the buffer block.</summary>
29         private readonly SourceCore<T> _source;
30         /// <summary>The bounding state for when in bounding mode; null if not bounding.</summary>
31         private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
32         /// <summary>Whether all future messages should be declined on the target.</summary>
33         private bool _targetDecliningPermanently;
34         /// <summary>A task has reserved the right to run the target's completion routine.</summary>
35         private bool _targetCompletionReserved;
36         /// <summary>Gets the lock object used to synchronize incoming requests.</summary>
37         private object IncomingLock { get { return _source; } }
38
39         /// <summary>Initializes the <see cref="BufferBlock{T}"/>.</summary>
40         public BufferBlock() :
41             this(DataflowBlockOptions.Default)
42         { }
43
44         /// <summary>Initializes the <see cref="BufferBlock{T}"/> with the specified <see cref="DataflowBlockOptions"/>.</summary>
45         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BufferBlock{T}"/>.</param>
46         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
47         public BufferBlock(DataflowBlockOptions dataflowBlockOptions)
48         {
49             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
50             Contract.EndContractBlock();
51
52             // Ensure we have options that can't be changed by the caller
53             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
54
55             // Initialize bounding state if necessary
56             Action<ISourceBlock<T>, int> onItemsRemoved = null;
57             if (dataflowBlockOptions.BoundedCapacity > 0)
58             {
59                 onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
60                 _boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
61             }
62
63             // Initialize the source state
64             _source = new SourceCore<T>(this, dataflowBlockOptions,
65                 owningSource => ((BufferBlock<T>)owningSource).Complete(),
66                 onItemsRemoved);
67
68             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
69             // In those cases we need to fault the target half to drop its buffered messages and to release its 
70             // reservations. This should not create an infinite loop, because all our implementations are designed
71             // to handle multiple completion requests and to carry over only one.
72             _source.Completion.ContinueWith((completed, state) =>
73             {
74                 var thisBlock = ((BufferBlock<T>)state) as IDataflowBlock;
75                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
76                 thisBlock.Fault(completed.Exception);
77             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
78
79             // Handle async cancellation requests by declining on the target
80             Common.WireCancellationToComplete(
81                 dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource).Complete(), this);
82 #if FEATURE_TRACING
83             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
84             if (etwLog.IsEnabled())
85             {
86                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
87             }
88 #endif
89         }
90
91         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
92         DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
93         {
94             // Validate arguments
95             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
96             if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
97             Contract.EndContractBlock();
98
99             lock (IncomingLock)
100             {
101                 // If we've already stopped accepting messages, decline permanently
102                 if (_targetDecliningPermanently)
103                 {
104                     CompleteTargetIfPossible();
105                     return DataflowMessageStatus.DecliningPermanently;
106                 }
107
108                 // We can directly accept the message if:
109                 //      1) we are not bounding, OR 
110                 //      2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing. 
111                 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
112                 // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
113                 // accepting new ones directly into the queue.)
114                 if (_boundingState == null
115                         ||
116                     (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
117                 {
118                     // Consume the message from the source if necessary
119                     if (consumeToAccept)
120                     {
121                         Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
122
123                         bool consumed;
124                         messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
125                         if (!consumed) return DataflowMessageStatus.NotAvailable;
126                     }
127
128                     // Once consumed, pass it to the source
129                     _source.AddMessage(messageValue);
130                     if (_boundingState != null) _boundingState.CurrentCount++;
131
132                     return DataflowMessageStatus.Accepted;
133                 }
134                 // Otherwise, we try to postpone if a source was provided
135                 else if (source != null)
136                 {
137                     Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
138                         "PostponedMessages must have been initialized during construction in bounding mode.");
139
140                     _boundingState.PostponedMessages.Push(source, messageHeader);
141                     return DataflowMessageStatus.Postponed;
142                 }
143                 // We can't do anything else about this message
144                 return DataflowMessageStatus.Declined;
145             }
146         }
147
148         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
149         public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); }
150
151         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
152         void IDataflowBlock.Fault(Exception exception)
153         {
154             if (exception == null) throw new ArgumentNullException("exception");
155             Contract.EndContractBlock();
156
157             CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
158         }
159
160         private void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
161         {
162             Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
163                             "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
164             Contract.EndContractBlock();
165
166             lock (IncomingLock)
167             {
168                 // Faulting from outside is allowed until we start declining permanently.
169                 // Faulting from inside is allowed at any time.
170                 if (exception != null && (!_targetDecliningPermanently || storeExceptionEvenIfAlreadyCompleting))
171                 {
172                     _source.AddException(exception);
173                 }
174
175                 // Revert the dirty processing state if requested
176                 if (revertProcessingState)
177                 {
178                     Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
179                                     "The processing state must be dirty when revertProcessingState==true.");
180                     _boundingState.TaskForInputProcessing = null;
181                 }
182
183                 // Trigger completion
184                 _targetDecliningPermanently = true;
185                 CompleteTargetIfPossible();
186             }
187         }
188
189         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
190         public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
191
192         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
193         public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
194
195         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
196         public Boolean TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
197
198         /// <summary>Gets the number of items currently stored in the buffer.</summary>
199         public Int32 Count { get { return _source.OutputCount; } }
200
201         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
202         public Task Completion { get { return _source.Completion; } }
203
204         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
205         T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
206         {
207             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
208         }
209
210         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
211         bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
212         {
213             return _source.ReserveMessage(messageHeader, target);
214         }
215
216         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
217         void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
218         {
219             _source.ReleaseReservation(messageHeader, target);
220         }
221
222         /// <summary>Notifies the block that one or more items was removed from the queue.</summary>
223         /// <param name="numItemsRemoved">The number of items removed.</param>
224         private void OnItemsRemoved(int numItemsRemoved)
225         {
226             Contract.Requires(numItemsRemoved > 0, "A positive number of items to remove is required.");
227             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
228
229             // If we're bounding, we need to know when an item is removed so that we
230             // can update the count that's mirroring the actual count in the source's queue,
231             // and potentially kick off processing to start consuming postponed messages.
232             if (_boundingState != null)
233             {
234                 lock (IncomingLock)
235                 {
236                     // Decrement the count, which mirrors the count in the source half
237                     Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
238                         "It should be impossible to have a negative number of items.");
239                     _boundingState.CurrentCount -= numItemsRemoved;
240
241                     ConsumeAsyncIfNecessary();
242                     CompleteTargetIfPossible();
243                 }
244             }
245         }
246
247         /// <summary>Called when postponed messages may need to be consumed.</summary>
248         /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
249         internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
250         {
251             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
252             Debug.Assert(_boundingState != null, "Must be in bounded mode.");
253
254             if (!_targetDecliningPermanently &&
255                 _boundingState.TaskForInputProcessing == null &&
256                 _boundingState.PostponedMessages.Count > 0 &&
257                 _boundingState.CountIsLessThanBound)
258             {
259                 // Create task and store into _taskForInputProcessing prior to scheduling the task
260                 // so that _taskForInputProcessing will be visibly set in the task loop.
261                 _boundingState.TaskForInputProcessing =
262                     new Task(state => ((BufferBlock<T>)state).ConsumeMessagesLoopCore(), this,
263                         Common.GetCreationOptionsForTask(isReplacementReplica));
264
265 #if FEATURE_TRACING
266                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
267                 if (etwLog.IsEnabled())
268                 {
269                     etwLog.TaskLaunchedForMessageHandling(
270                         this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
271                         _boundingState.PostponedMessages.Count);
272                 }
273 #endif
274
275                 // Start the task handling scheduling exceptions
276                 Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
277                 if (exception != null)
278                 {
279                     // Get out from under currently held locks. CompleteCore re-acquires the locks it needs.
280                     Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
281                                         exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
282                 }
283             }
284         }
285
286
287         /// <summary>Task body used to consume postponed messages.</summary>
288         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
289         private void ConsumeMessagesLoopCore()
290         {
291             Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
292                 "May only be called in bounded mode and when a task is in flight.");
293             Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
294                 "This must only be called from the in-flight processing task.");
295             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
296
297             try
298             {
299                 int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
300                 for (int i = 0;
301                     i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
302                     i++)
303                     ;
304             }
305             catch (Exception exc)
306             {
307                 // Prevent the creation of new processing tasks
308                 CompleteCore(exc, storeExceptionEvenIfAlreadyCompleting: true);
309             }
310             finally
311             {
312                 lock (IncomingLock)
313                 {
314                     // We're no longer processing, so null out the processing task
315                     _boundingState.TaskForInputProcessing = null;
316
317                     // However, we may have given up early because we hit our own configured
318                     // processing limits rather than because we ran out of work to do.  If that's
319                     // the case, make sure we spin up another task to keep going.
320                     ConsumeAsyncIfNecessary(isReplacementReplica: true);
321
322                     // If, however, we stopped because we ran out of work to do and we
323                     // know we'll never get more, then complete.
324                     CompleteTargetIfPossible();
325                 }
326             }
327         }
328
329         /// <summary>
330         /// Retrieves one postponed message if there's room and if we can consume a postponed message.
331         /// Stores any consumed message into the source half.
332         /// </summary>
333         /// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
334         /// <remarks>This must only be called from the asynchronous processing loop.</remarks>
335         private bool ConsumeAndStoreOneMessageIfAvailable()
336         {
337             Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
338                 "May only be called in bounded mode and when a task is in flight.");
339             Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
340                 "This must only be called from the in-flight processing task.");
341             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
342
343             // Loop through the postponed messages until we get one.
344             while (true)
345             {
346                 // Get the next item to retrieve.  If there are no more, bail.
347                 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
348                 lock (IncomingLock)
349                 {
350                     if (!_boundingState.CountIsLessThanBound) return false;
351                     if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
352
353                     // Optimistically assume we're going to get the item. This avoids taking the lock
354                     // again if we're right.  If we're wrong, we decrement it later under lock.
355                     _boundingState.CurrentCount++;
356                 }
357
358                 // Consume the item
359                 bool consumed = false;
360                 try
361                 {
362                     T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
363                     if (consumed)
364                     {
365                         _source.AddMessage(consumedValue);
366                         return true;
367                     }
368                 }
369                 finally
370                 {
371                     // We didn't get the item, so decrement the count to counteract our optimistic assumption.
372                     if (!consumed)
373                     {
374                         lock (IncomingLock) _boundingState.CurrentCount--;
375                     }
376                 }
377             }
378         }
379
380         /// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
381         private void CompleteTargetIfPossible()
382         {
383             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
384             if (_targetDecliningPermanently &&
385                 !_targetCompletionReserved &&
386                 (_boundingState == null || _boundingState.TaskForInputProcessing == null))
387             {
388                 _targetCompletionReserved = true;
389
390                 // If we're in bounding mode and we have any postponed messages, we need to clear them,
391                 // which means calling back to the source, which means we need to escape the incoming lock.
392                 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
393                 {
394                     Task.Factory.StartNew(state =>
395                     {
396                         var thisBufferBlock = (BufferBlock<T>)state;
397
398                         // Release any postponed messages
399                         List<Exception> exceptions = null;
400                         if (thisBufferBlock._boundingState != null)
401                         {
402                             // Note: No locks should be held at this point
403                             Common.ReleaseAllPostponedMessages(thisBufferBlock,
404                                                                thisBufferBlock._boundingState.PostponedMessages,
405                                                                ref exceptions);
406                         }
407
408                         if (exceptions != null)
409                         {
410                             // It is important to migrate these exceptions to the source part of the owning batch,
411                             // because that is the completion task that is publically exposed.
412                             thisBufferBlock._source.AddExceptions(exceptions);
413                         }
414
415                         thisBufferBlock._source.Complete();
416                     }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
417                 }
418                 // Otherwise, we can just decline the source directly.
419                 else
420                 {
421                     _source.Complete();
422                 }
423             }
424         }
425
426         /// <summary>Gets the number of messages in the buffer.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
427         private int CountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
428
429         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
430         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
431
432         /// <summary>The data to display in the debugger display attribute.</summary>
433         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
434         private object DebuggerDisplayContent
435         {
436             get
437             {
438                 return string.Format("{0}, Count={1}",
439                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
440                     CountForDebugger);
441             }
442         }
443         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
444         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
445
446         /// <summary>Provides a debugger type proxy for the BufferBlock.</summary>
447         private sealed class DebugView
448         {
449             /// <summary>The buffer block.</summary>
450             private readonly BufferBlock<T> _bufferBlock;
451             /// <summary>The buffer's source half.</summary>
452             private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
453
454             /// <summary>Initializes the debug view.</summary>
455             /// <param name="bufferBlock">The BufferBlock being viewed.</param>
456             public DebugView(BufferBlock<T> bufferBlock)
457             {
458                 Contract.Requires(bufferBlock != null, "Need a block with which to construct the debug view.");
459                 _bufferBlock = bufferBlock;
460                 _sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
461             }
462
463             /// <summary>Gets the collection of postponed message headers.</summary>
464             public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages
465             {
466                 get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.PostponedMessages : null; }
467             }
468             /// <summary>Gets the messages in the buffer.</summary>
469             public IEnumerable<T> Queue { get { return _sourceDebuggingInformation.OutputQueue; } }
470
471             /// <summary>The task used to process messages.</summary>
472             public Task TaskForInputProcessing { get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.TaskForInputProcessing : null; } }
473             /// <summary>Gets the task being used for output processing.</summary>
474             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
475
476             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
477             public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
478
479             /// <summary>Gets whether the block is declining further messages.</summary>
480             public bool IsDecliningPermanently { get { return _bufferBlock._targetDecliningPermanently; } }
481             /// <summary>Gets whether the block is completed.</summary>
482             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
483             /// <summary>Gets the block's Id.</summary>
484             public int Id { get { return Common.GetBlockId(_bufferBlock); } }
485
486             /// <summary>Gets the set of all targets linked from this block.</summary>
487             public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
488             /// <summary>Gets the set of all targets linked from this block.</summary>
489             public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
490         }
491     }
492 }