Merge pull request #2274 from esdrubal/udpclientreceive
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / TargetCore.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // TargetCore.cs
7 //
8 //
9 // The core implementation of a standard ITargetBlock<TInput>.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Concurrent;
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
18 using System.Linq;
19 using System.Security;
20
21 namespace System.Threading.Tasks.Dataflow.Internal
22 {
23     // LOCK-LEVELING SCHEME
24     // --------------------
25     // TargetCore employs a single lock: IncomingLock.  This lock must not be used when calling out to any targets,
26     // which TargetCore should not have, anyway.  It also must not be held when calling back to any sources, except
27     // during calls to OfferMessage from that same source.
28
29     /// <summary>Options used to configure a target core.</summary>
30     [Flags]
31     internal enum TargetCoreOptions : byte
32     {
33         /// <summary>Synchronous completion, both a target and a source, etc.</summary>
34         None = 0x0,
35         /// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
36         UsesAsyncCompletion = 0x1,
37         /// <summary>
38         /// Whether the block containing this target core is just a target or also has a source side.
39         /// If it's just a target, then this target core's completion represents the entire block's completion.
40         /// </summary>
41         RepresentsBlockCompletion = 0x2
42     }
43
44     /// <summary>
45     /// Provides a core implementation of <see cref="ITargetBlock{TInput}"/>.</summary>
46     /// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="TargetCore{TInput}"/>.</typeparam>
47     [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
48     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
49     internal sealed class TargetCore<TInput>
50     {
51         // *** These fields are readonly and are initialized at AppDomain startup.
52
53         /// <summary>Caching the keep alive predicate.</summary>
54         private static readonly Common.KeepAlivePredicate<TargetCore<TInput>, KeyValuePair<TInput, long>> _keepAlivePredicate =
55                 (TargetCore<TInput> thisTargetCore, out KeyValuePair<TInput, long> messageWithId) =>
56                     thisTargetCore.TryGetNextAvailableOrPostponedMessage(out messageWithId);
57
58         // *** These fields are readonly and are initialized to new instances at construction.
59
60         /// <summary>A task representing the completion of the block.</summary>
61         private readonly TaskCompletionSource<VoidResult> _completionSource = new TaskCompletionSource<VoidResult>();
62
63         // *** These fields are readonly and are initialized by arguments to the constructor.
64
65         /// <summary>The target block using this helper.</summary>
66         private readonly ITargetBlock<TInput> _owningTarget;
67         /// <summary>The messages in this target.</summary>
68         /// <remarks>This field doubles as the IncomingLock.</remarks>
69         private readonly IProducerConsumerQueue<KeyValuePair<TInput, long>> _messages;
70         /// <summary>The options associated with this block.</summary>
71         private readonly ExecutionDataflowBlockOptions _dataflowBlockOptions;
72         /// <summary>An action to invoke for every accepted message.</summary>
73         private readonly Action<KeyValuePair<TInput, long>> _callAction;
74         /// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
75         private readonly TargetCoreOptions _targetCoreOptions;
76         /// <summary>Bounding state for when the block is executing in bounded mode.</summary>
77         private readonly BoundingStateWithPostponed<TInput> _boundingState;
78         /// <summary>The reordering buffer used by the owner.  May be null.</summary>
79         private readonly IReorderingBuffer _reorderingBuffer;
80
81         /// <summary>Gets the object used as the incoming lock.</summary>
82         private object IncomingLock { get { return _messages; } }
83
84         // *** These fields are mutated during execution.
85
86         /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
87         private List<Exception> _exceptions;
88         /// <summary>Whether to stop accepting new messages.</summary>
89         private bool _decliningPermanently;
90         /// <summary>The number of operations (including service tasks) currently running asynchronously.</summary>
91         /// <remarks>Must always be accessed from inside a lock.</remarks>
92         private int _numberOfOutstandingOperations;
93         /// <summary>The number of service tasks in async mode currently running.</summary>
94         /// <remarks>Must always be accessed from inside a lock.</remarks>
95         private int _numberOfOutstandingServiceTasks;
96         /// <summary>The next available ID we can assign to a message about to be processed.</summary>
97         private PaddedInt64 _nextAvailableInputMessageId; // initialized to 0... very important for a reordering buffer
98         /// <summary>A task has reserved the right to run the completion routine.</summary>
99         private bool _completionReserved;
100         /// <summary>This counter is set by the processing loop to prevent itself from trying to keep alive.</summary>
101         private int _keepAliveBanCounter;
102
103         /// <summary>Initializes the target core.</summary>
104         /// <param name="owningTarget">The target using this helper.</param>
105         /// <param name="callAction">An action to invoke for all accepted items.</param>
106         /// <param name="reorderingBuffer">The reordering buffer used by the owner; may be null.</param>
107         /// <param name="dataflowBlockOptions">The options to use to configure this block. The target core assumes these options are immutable.</param>
108         /// <param name="targetCoreOptions">Options for how the target core should behave.</param>
109         internal TargetCore(
110             ITargetBlock<TInput> owningTarget,
111             Action<KeyValuePair<TInput, long>> callAction,
112             IReorderingBuffer reorderingBuffer,
113             ExecutionDataflowBlockOptions dataflowBlockOptions,
114             TargetCoreOptions targetCoreOptions)
115         {
116             // Validate internal arguments
117             Contract.Requires(owningTarget != null, "Core must be associated with a target block.");
118             Contract.Requires(dataflowBlockOptions != null, "Options must be provided to configure the core.");
119             Contract.Requires(callAction != null, "Action to invoke for each item is required.");
120
121             // Store arguments and do additional initialization
122             _owningTarget = owningTarget;
123             _callAction = callAction;
124             _reorderingBuffer = reorderingBuffer;
125             _dataflowBlockOptions = dataflowBlockOptions;
126             _targetCoreOptions = targetCoreOptions;
127             _messages = (dataflowBlockOptions.MaxDegreeOfParallelism == 1) ?
128                 (IProducerConsumerQueue<KeyValuePair<TInput, long>>)new SingleProducerSingleConsumerQueue<KeyValuePair<TInput, long>>() :
129                 (IProducerConsumerQueue<KeyValuePair<TInput, long>>)new MultiProducerMultiConsumerQueue<KeyValuePair<TInput, long>>();
130             if (_dataflowBlockOptions.BoundedCapacity != System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded)
131             {
132                 Debug.Assert(_dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor");
133                 _boundingState = new BoundingStateWithPostponed<TInput>(_dataflowBlockOptions.BoundedCapacity);
134             }
135         }
136
137         /// <summary>Internal Complete entry point with extra parameters for different contexts.</summary>
138         /// <param name="exception">If not null, the block will be faulted.</param>
139         /// <param name="dropPendingMessages">If true, any unprocessed input messages will be dropped.</param>
140         /// <param name="storeExceptionEvenIfAlreadyCompleting">If true, an exception will be stored after _decliningPermanently has been set to true.</param>
141         /// <param name="unwrapInnerExceptions">If true, exception will be treated as an AggregateException.</param>
142         /// <param name="revertProcessingState">Indicates whether the processing state is dirty and has to be reverted.</param>
143         internal void Complete(Exception exception, bool dropPendingMessages, bool storeExceptionEvenIfAlreadyCompleting = false,
144             bool unwrapInnerExceptions = false, bool revertProcessingState = false)
145         {
146             Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
147                             "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
148             Contract.EndContractBlock();
149
150             // Ensure that no new messages may be added
151             lock (IncomingLock)
152             {
153                 // Faulting from outside is allowed until we start declining permanently.
154                 // Faulting from inside is allowed at any time.
155                 if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting))
156                 {
157                     Debug.Assert(_numberOfOutstandingOperations > 0 || !storeExceptionEvenIfAlreadyCompleting,
158                                 "Calls with storeExceptionEvenIfAlreadyCompleting==true may only be coming from processing task.");
159
160 #pragma warning disable 0420
161                     Common.AddException(ref _exceptions, exception, unwrapInnerExceptions);
162                 }
163
164                 // Clear the messages queue if requested
165                 if (dropPendingMessages)
166                 {
167                     KeyValuePair<TInput, long> dummy;
168                     while (_messages.TryDequeue(out dummy)) ;
169                 }
170
171                 // Revert the dirty processing state if requested
172                 if (revertProcessingState)
173                 {
174                     Debug.Assert(_numberOfOutstandingOperations > 0 && (!UsesAsyncCompletion || _numberOfOutstandingServiceTasks > 0),
175                                     "The processing state must be dirty when revertProcessingState==true.");
176                     _numberOfOutstandingOperations--;
177                     if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks--;
178                 }
179
180                 // Trigger completion
181                 _decliningPermanently = true;
182                 CompleteBlockIfPossible();
183             }
184         }
185
186         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
187         internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
188         {
189             // Validate arguments
190             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
191             if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
192             Contract.EndContractBlock();
193
194             lock (IncomingLock)
195             {
196                 // If we shouldn't be accepting more messages, don't.
197                 if (_decliningPermanently)
198                 {
199                     CompleteBlockIfPossible();
200                     return DataflowMessageStatus.DecliningPermanently;
201                 }
202
203                 // We can directly accept the message if:
204                 //      1) we are not bounding, OR 
205                 //      2) we are bounding AND there is room available AND there are no postponed messages AND no messages are currently being transfered to the input queue.
206                 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
207                 // (Unlike all other blocks, TargetCore can accept messages while processing, because 
208                 // input message IDs are properly assigned and the correct order is preserved.)
209                 if (_boundingState == null ||
210                     (_boundingState.OutstandingTransfers == 0 && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0))
211                 {
212                     // Consume the message from the source if necessary
213                     if (consumeToAccept)
214                     {
215                         Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
216
217                         bool consumed;
218                         messageValue = source.ConsumeMessage(messageHeader, _owningTarget, out consumed);
219                         if (!consumed) return DataflowMessageStatus.NotAvailable;
220                     }
221
222                     // Assign a message ID - strictly sequential, no gaps.
223                     // Once consumed, enqueue the message with its ID and kick off asynchronous processing.
224                     long messageId = _nextAvailableInputMessageId.Value++;
225                     Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
226                     if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
227                     _messages.Enqueue(new KeyValuePair<TInput, long>(messageValue, messageId));
228                     ProcessAsyncIfNecessary();
229                     return DataflowMessageStatus.Accepted;
230                 }
231                 // Otherwise, we try to postpone if a source was provided
232                 else if (source != null)
233                 {
234                     Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
235                         "PostponedMessages must have been initialized during construction in non-greedy mode.");
236
237                     // Store the message's info and kick off asynchronous processing
238                     _boundingState.PostponedMessages.Push(source, messageHeader);
239                     ProcessAsyncIfNecessary();
240                     return DataflowMessageStatus.Postponed;
241                 }
242                 // We can't do anything else about this message
243                 return DataflowMessageStatus.Declined;
244             }
245         }
246
247         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
248         internal Task Completion { get { return _completionSource.Task; } }
249
250         /// <summary>Gets the number of items waiting to be processed by this target.</summary>
251         internal int InputCount { get { return _messages.GetCountSafe(IncomingLock); } }
252
253         /// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
254         internal void SignalOneAsyncMessageCompleted()
255         {
256             SignalOneAsyncMessageCompleted(boundingCountChange: 0);
257         }
258
259         /// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
260         /// <param name="boundingCountChange">The number of elements by which to change the bounding count, if bounding is occurring.</param>
261         internal void SignalOneAsyncMessageCompleted(int boundingCountChange)
262         {
263             lock (IncomingLock)
264             {
265                 // We're no longer processing, so decrement the DOP counter
266                 Debug.Assert(_numberOfOutstandingOperations > 0, "Operations may only be completed if any are outstanding.");
267                 if (_numberOfOutstandingOperations > 0) _numberOfOutstandingOperations--;
268
269                 // Fix up the bounding count if necessary
270                 if (_boundingState != null && boundingCountChange != 0)
271                 {
272                     Debug.Assert(boundingCountChange <= 0 && _boundingState.CurrentCount + boundingCountChange >= 0,
273                         "Expected a negative bounding change and not to drop below zero.");
274                     _boundingState.CurrentCount += boundingCountChange;
275                 }
276
277                 // However, we may have given up early because we hit our own configured
278                 // processing limits rather than because we ran out of work to do.  If that's
279                 // the case, make sure we spin up another task to keep going.
280                 ProcessAsyncIfNecessary(repeat: true);
281
282                 // If, however, we stopped because we ran out of work to do and we
283                 // know we'll never get more, then complete.
284                 CompleteBlockIfPossible();
285             }
286         }
287
288         /// <summary>Gets whether this instance has been constructed for async processing.</summary>
289         private bool UsesAsyncCompletion
290         {
291             get
292             {
293                 return (_targetCoreOptions & TargetCoreOptions.UsesAsyncCompletion) != 0;
294             }
295         }
296
297         /// <summary>Gets whether there's room to launch more processing operations.</summary>
298         private bool HasRoomForMoreOperations
299         {
300             get
301             {
302                 Contract.Requires(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
303                 Contract.Requires(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
304                 Contract.Requires(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
305                 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
306
307                 // In async mode, we increment _numberOfOutstandingOperations before we start 
308                 // our own processing loop which should not count towards the MaxDOP.
309                 return (_numberOfOutstandingOperations - _numberOfOutstandingServiceTasks) < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
310             }
311         }
312
313         /// <summary>Gets whether there's room to launch more service tasks for doing/launching processing operations.</summary>
314         private bool HasRoomForMoreServiceTasks
315         {
316             get
317             {
318                 Contract.Requires(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
319                 Contract.Requires(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
320                 Contract.Requires(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
321                 Common.ContractAssertMonitorStatus(IncomingLock, held: true);
322
323                 if (!UsesAsyncCompletion)
324                 {
325                     // Sync mode: 
326                     // We don't count service tasks, because our tasks are counted as operations.
327                     // Therefore, return HasRoomForMoreOperations.
328                     return HasRoomForMoreOperations;
329                 }
330                 else
331                 {
332                     // Async mode:
333                     // We allow up to MaxDOP true service tasks.
334                     // Checking whether there is room for more processing operations is not necessary, 
335                     // but doing so will help us avoid spinning up a task that will go away without 
336                     // launching any processing operation.
337                     return HasRoomForMoreOperations &&
338                            _numberOfOutstandingServiceTasks < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
339                 }
340             }
341         }
342
343         /// <summary>Called when new messages are available to be processed.</summary>
344         /// <param name="repeat">Whether this call is the continuation of a previous message loop.</param>
345         private void ProcessAsyncIfNecessary(bool repeat = false)
346         {
347             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
348
349             if (HasRoomForMoreServiceTasks)
350             {
351                 ProcessAsyncIfNecessary_Slow(repeat);
352             }
353         }
354
355         /// <summary>
356         /// Slow path for ProcessAsyncIfNecessary. 
357         /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
358         /// </summary>
359         [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
360         private void ProcessAsyncIfNecessary_Slow(bool repeat)
361         {
362             Contract.Requires(HasRoomForMoreServiceTasks, "There must be room to process asynchronously.");
363             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
364
365             // Determine preconditions to launching a processing task
366             bool messagesAvailableOrPostponed =
367                 !_messages.IsEmpty ||
368                 (!_decliningPermanently && _boundingState != null && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count > 0);
369
370             // If all conditions are met, launch away
371             if (messagesAvailableOrPostponed && !CanceledOrFaulted)
372             {
373                 // Any book keeping related to the processing task like incrementing the 
374                 // DOP counter or eventually recording the tasks reference must be done
375                 // before the task starts. That is because the task itself will do the 
376                 // reverse operation upon its completion.
377                 _numberOfOutstandingOperations++;
378                 if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks++;
379
380                 var taskForInputProcessing = new Task(thisTargetCore => ((TargetCore<TInput>)thisTargetCore).ProcessMessagesLoopCore(), this,
381                                                       Common.GetCreationOptionsForTask(repeat));
382
383 #if FEATURE_TRACING
384                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
385                 if (etwLog.IsEnabled())
386                 {
387                     etwLog.TaskLaunchedForMessageHandling(
388                         _owningTarget, taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
389                         _messages.Count + (_boundingState != null ? _boundingState.PostponedMessages.Count : 0));
390                 }
391 #endif
392
393                 // Start the task handling scheduling exceptions
394                 Exception exception = Common.StartTaskSafe(taskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
395                 if (exception != null)
396                 {
397                     // Get out from under currently held locks. Complete re-acquires the locks it needs.
398                     Task.Factory.StartNew(exc => Complete(exception: (Exception)exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true,
399                                                         unwrapInnerExceptions: false, revertProcessingState: true),
400                                         exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
401                 }
402             }
403         }
404
405         /// <summary>Task body used to process messages.</summary>
406         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
407         private void ProcessMessagesLoopCore()
408         {
409             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
410
411             KeyValuePair<TInput, long> messageWithId = default(KeyValuePair<TInput, long>);
412             try
413             {
414                 bool useAsyncCompletion = UsesAsyncCompletion;
415                 bool shouldAttemptPostponedTransfer = _boundingState != null && _boundingState.BoundedCapacity > 1;
416                 int numberOfMessagesProcessedByThisTask = 0;
417                 int numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
418                 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
419
420                 while (numberOfMessagesProcessedByThisTask < maxMessagesPerTask && !CanceledOrFaulted)
421                 {
422                     // If we're bounding, try to transfer a message from the postponed queue
423                     // to the input queue.  This enables us to more quickly unblock sources
424                     // sending data to the block (otherwise, no postponed messages will be consumed
425                     // until the input queue is entirely empty).  If the bounded size is 1,
426                     // there's no need to transfer, as attempting to get the next message will
427                     // just go and consume the postponed message anyway, and we'll save
428                     // the extra trip through the _messages queue.
429                     KeyValuePair<TInput, long> transferMessageWithId;
430                     if (shouldAttemptPostponedTransfer &&
431                         TryConsumePostponedMessage(forPostponementTransfer: true, result: out transferMessageWithId))
432                     {
433                         lock (IncomingLock)
434                         {
435                             Debug.Assert(
436                                 _boundingState.OutstandingTransfers > 0
437                                 && _boundingState.OutstandingTransfers <= _dataflowBlockOptions.ActualMaxDegreeOfParallelism,
438                                 "Expected TryConsumePostponedMessage to have incremented the count and for the count to not exceed the DOP.");
439                             _boundingState.OutstandingTransfers--; // was incremented in TryConsumePostponedMessage
440                             _messages.Enqueue(transferMessageWithId);
441                             ProcessAsyncIfNecessary();
442                         }
443                     }
444
445                     if (useAsyncCompletion)
446                     {
447                         // Get the next message if DOP is available.
448                         // If we can't get a message or DOP is not available, bail out.
449                         if (!TryGetNextMessageForNewAsyncOperation(out messageWithId)) break;
450                     }
451                     else
452                     {
453                         // Try to get a message for sequential execution, i.e. without checking DOP availability 
454                         if (!TryGetNextAvailableOrPostponedMessage(out messageWithId))
455                         {
456                             // Try to keep the task alive only if MaxDOP=1
457                             if (_dataflowBlockOptions.MaxDegreeOfParallelism != 1) break;
458
459                             // If this task has processed enough messages without being kept alive, 
460                             // it has served its purpose. Don't keep it alive.
461                             if (numberOfMessagesProcessedSinceTheLastKeepAlive > Common.KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD) break;
462
463                             // If keep alive is banned, don't attempt it
464                             if (_keepAliveBanCounter > 0)
465                             {
466                                 _keepAliveBanCounter--;
467                                 break;
468                             }
469
470                             // Reset the keep alive counter. (Keep this line together with TryKeepAliveUntil.)
471                             numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
472
473                             // Try to keep the task alive briefly until a new message arrives
474                             if (!Common.TryKeepAliveUntil(_keepAlivePredicate, this, out messageWithId))
475                             {
476                                 // Keep alive was unsuccessful. 
477                                 // Therefore ban further attempts temporarily.
478                                 _keepAliveBanCounter = Common.KEEP_ALIVE_BAN_COUNT;
479                                 break;
480                             }
481                         }
482                     }
483
484                     // We have popped a message from the queue.
485                     // So increment the counter of processed messages.
486                     numberOfMessagesProcessedByThisTask++;
487                     numberOfMessagesProcessedSinceTheLastKeepAlive++;
488
489                     // Invoke the user action
490                     _callAction(messageWithId);
491                 }
492             }
493             catch (Exception exc)
494             {
495                 Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key);
496                 Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
497             }
498             finally
499             {
500                 lock (IncomingLock)
501                 {
502                     // We incremented _numberOfOutstandingOperations before we launched this task.
503                     // So we must decremented it before exiting.
504                     // Note that each async task additionally incremented it before starting and 
505                     // is responsible for decrementing it prior to exiting.
506                     Debug.Assert(_numberOfOutstandingOperations > 0, "Expected a positive number of outstanding operations, since we're completing one here.");
507                     _numberOfOutstandingOperations--;
508
509                     // If we are in async mode, we've also incremented _numberOfOutstandingServiceTasks.
510                     // Now it's time to decrement it.
511                     if (UsesAsyncCompletion)
512                     {
513                         Debug.Assert(_numberOfOutstandingServiceTasks > 0, "Expected a positive number of outstanding service tasks, since we're completing one here.");
514                         _numberOfOutstandingServiceTasks--;
515                     }
516
517                     // However, we may have given up early because we hit our own configured
518                     // processing limits rather than because we ran out of work to do.  If that's
519                     // the case, make sure we spin up another task to keep going.
520                     ProcessAsyncIfNecessary(repeat: true);
521
522                     // If, however, we stopped because we ran out of work to do and we
523                     // know we'll never get more, then complete.
524                     CompleteBlockIfPossible();
525                 }
526             }
527         }
528
529         /// <summary>Retrieves the next message from the input queue for the useAsyncCompletion mode.</summary>
530         /// <param name="messageWithId">The next message retrieved.</param>
531         /// <returns>true if a message was found and removed; otherwise, false.</returns>
532         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
533         private bool TryGetNextMessageForNewAsyncOperation(out KeyValuePair<TInput, long> messageWithId)
534         {
535             Contract.Requires(UsesAsyncCompletion, "Only valid to use when in async mode.");
536             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
537
538             bool parallelismAvailable;
539
540             lock (IncomingLock)
541             {
542                 // If we have room for another asynchronous operation, reserve it.
543                 // If later it turns out that we had no work to fill the slot, we'll undo the addition.
544                 parallelismAvailable = HasRoomForMoreOperations;
545                 if (parallelismAvailable) ++_numberOfOutstandingOperations;
546             }
547
548             messageWithId = default(KeyValuePair<TInput, long>);
549             if (parallelismAvailable)
550             {
551                 // If a parallelism slot was available, try to get an item.
552                 // Be careful, because an exception may be thrown from ConsumeMessage
553                 // and we have already incremented _numberOfOutstandingOperations.
554                 bool gotMessage = false;
555                 try
556                 {
557                     gotMessage = TryGetNextAvailableOrPostponedMessage(out messageWithId);
558                 }
559                 catch
560                 {
561                     // We have incremented the counter, but we didn't get a message.
562                     // So we must undo the increment and eventually complete the block.
563                     SignalOneAsyncMessageCompleted();
564
565                     // Re-throw the exception. The processing loop will catch it.
566                     throw;
567                 }
568
569                 // There may not be an error, but may have still failed to get a message.
570                 // So we must undo the increment and eventually complete the block.
571                 if (!gotMessage) SignalOneAsyncMessageCompleted();
572
573                 return gotMessage;
574             }
575
576             // If there was no parallelism available, we didn't increment _numberOfOutstandingOperations.
577             // So there is nothing to do except to return false.
578             return false;
579         }
580
581         /// <summary>
582         /// Either takes the next available message from the input queue or retrieves a postponed 
583         /// message from a source, based on whether we're in greedy or non-greedy mode.
584         /// </summary>
585         /// <param name="messageWithId">The retrieved item with its Id.</param>
586         /// <returns>true if a message could be removed and returned; otherwise, false.</returns>
587         private bool TryGetNextAvailableOrPostponedMessage(out KeyValuePair<TInput, long> messageWithId)
588         {
589             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
590
591             // First try to get a message from our input buffer.
592             if (_messages.TryDequeue(out messageWithId))
593             {
594                 return true;
595             }
596             // If we can't, but if we have any postponed messages due to bounding, then
597             // try to consume one of these postponed messages.
598             // Since we are not currently holding the lock, it is possible that new messages get queued up
599             // by the time we take the lock to manipulate _boundingState. So we have to double-check the
600             // input queue once we take the lock before we consider postponed messages.
601             else if (_boundingState != null && TryConsumePostponedMessage(forPostponementTransfer: false, result: out messageWithId))
602             {
603                 return true;
604             }
605             // Otherwise, there's no message available.
606             else
607             {
608                 messageWithId = default(KeyValuePair<TInput, long>);
609                 return false;
610             }
611         }
612
613         /// <summary>Consumes a single postponed message.</summary>
614         /// <param name="forPostponementTransfer">
615         /// true if the method is being called to consume a message that'll then be stored into the input queue;
616         /// false if the method is being called to consume a message that'll be processed immediately.
617         /// If true, the bounding state's ForcePostponement will be updated.
618         /// If false, the method will first try (while holding the lock) to consume from the input queue before
619         /// consuming a postponed message.
620         /// </param>
621         /// <param name="result">The consumed message.</param>
622         /// <returns>true if a message was consumed; otherwise, false.</returns>
623         private bool TryConsumePostponedMessage(
624             bool forPostponementTransfer,
625             out KeyValuePair<TInput, long> result)
626         {
627             Contract.Requires(
628                 _dataflowBlockOptions.BoundedCapacity !=
629                 System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded, "Only valid to use when in bounded mode.");
630             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
631
632             // Iterate until we either consume a message successfully or there are no more postponed messages.
633             bool countIncrementedExpectingToGetItem = false;
634             long messageId = Common.INVALID_REORDERING_ID;
635             while (true)
636             {
637                 KeyValuePair<ISourceBlock<TInput>, DataflowMessageHeader> element;
638                 lock (IncomingLock)
639                 {
640                     // If we are declining permanently, don't consume postponed messages.
641                     if (_decliningPermanently) break;
642
643                     // New messages may have been queued up while we weren't holding the lock.
644                     // In particular, the input queue may have been filled up and messages may have
645                     // gotten postponed. If we process such a postponed message, we would mess up the
646                     // order. Therefore, we have to double-check the input queue first.
647                     if (!forPostponementTransfer && _messages.TryDequeue(out result)) return true;
648
649                     // We can consume a message to process if there's one to process and also if
650                     // if we have logical room within our bound for the message.
651                     if (!_boundingState.CountIsLessThanBound || !_boundingState.PostponedMessages.TryPop(out element))
652                     {
653                         if (countIncrementedExpectingToGetItem)
654                         {
655                             countIncrementedExpectingToGetItem = false;
656                             _boundingState.CurrentCount -= 1;
657                         }
658                         break;
659                     }
660                     if (!countIncrementedExpectingToGetItem)
661                     {
662                         countIncrementedExpectingToGetItem = true;
663                         messageId = _nextAvailableInputMessageId.Value++; // optimistically assign an ID
664                         Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
665                         _boundingState.CurrentCount += 1; // optimistically take bounding space
666                         if (forPostponementTransfer)
667                         {
668                             Debug.Assert(_boundingState.OutstandingTransfers >= 0, "Expected TryConsumePostponedMessage to not be negative.");
669                             _boundingState.OutstandingTransfers++; // temporarily force postponement until we've successfully consumed the element
670                         }
671                     }
672                 } // Must not call to source while holding lock
673
674                 bool consumed;
675                 TInput consumedValue = element.Key.ConsumeMessage(element.Value, _owningTarget, out consumed);
676                 if (consumed)
677                 {
678                     result = new KeyValuePair<TInput, long>(consumedValue, messageId);
679                     return true;
680                 }
681                 else
682                 {
683                     if (forPostponementTransfer)
684                     {
685                         // We didn't consume message so we need to decrement because we havent consumed the element.
686                         _boundingState.OutstandingTransfers--;
687                     }
688                 }
689             }
690
691             // We optimistically acquired a message ID for a message that, in the end, we never got.
692             // So, we need to let the reordering buffer (if one exists) know that it should not
693             // expect an item with this ID.  Otherwise, it would stall forever.
694             if (_reorderingBuffer != null && messageId != Common.INVALID_REORDERING_ID) _reorderingBuffer.IgnoreItem(messageId);
695
696             // Similarly, we optimistically increased the bounding count, expecting to get another message in.
697             // Since we didn't, we need to fix the bounding count back to what it should have been.
698             if (countIncrementedExpectingToGetItem) ChangeBoundingCount(-1);
699
700             // Inform the caller that no message could be consumed.
701             result = default(KeyValuePair<TInput, long>);
702             return false;
703         }
704
705         /// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary>
706         private bool CanceledOrFaulted
707         {
708             get
709             {
710                 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || Volatile.Read(ref _exceptions) != null;
711             }
712         }
713
714         /// <summary>Completes the block once all completion conditions are met.</summary>
715         private void CompleteBlockIfPossible()
716         {
717             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
718
719             bool noMoreMessages = _decliningPermanently && _messages.IsEmpty;
720             if (noMoreMessages || CanceledOrFaulted)
721             {
722                 CompleteBlockIfPossible_Slow();
723             }
724         }
725
726         /// <summary>
727         /// Slow path for CompleteBlockIfPossible. 
728         /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
729         /// </summary>
730         private void CompleteBlockIfPossible_Slow()
731         {
732             Contract.Requires((_decliningPermanently && _messages.IsEmpty) || CanceledOrFaulted, "There must be no more messages.");
733             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
734
735             bool notCurrentlyProcessing = _numberOfOutstandingOperations == 0;
736             if (notCurrentlyProcessing && !_completionReserved)
737             {
738                 // Make sure no one else tries to call CompleteBlockOncePossible
739                 _completionReserved = true;
740
741                 // Make sure the target is declining
742                 _decliningPermanently = true;
743
744                 // Get out from under currently held locks.  This is to avoid
745                 // invoking synchronous continuations off of _completionSource.Task
746                 // while holding a lock.
747                 Task.Factory.StartNew(state => ((TargetCore<TInput>)state).CompleteBlockOncePossible(),
748                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
749             }
750         }
751
752         /// <summary>
753         /// Completes the block.  This must only be called once, and only once all of the completion conditions are met.
754         /// As such, it must only be called from CompleteBlockIfPossible.
755         /// </summary>
756         private void CompleteBlockOncePossible()
757         {
758             // Since the lock is needed only for the Assert, we do this only in DEBUG mode
759 #if DEBUG
760             lock (IncomingLock) Debug.Assert(_numberOfOutstandingOperations == 0, "Everything must be done by now.");
761 #endif
762
763             // Release any postponed messages
764             if (_boundingState != null)
765             {
766                 // Note: No locks should be held at this point.
767                 Common.ReleaseAllPostponedMessages(_owningTarget, _boundingState.PostponedMessages, ref _exceptions);
768             }
769
770             // For good measure and help in preventing leaks, clear out the incoming message queue, 
771             // which may still contain orphaned data if we were canceled or faulted.  However,
772             // we don't reset the bounding count here, as the block as a whole may still be active.
773             KeyValuePair<TInput, long> ignored;
774             IProducerConsumerQueue<KeyValuePair<TInput, long>> messages = _messages;
775             while (messages.TryDequeue(out ignored)) ;
776
777             // If we completed with any unhandled exception, finish in an error state
778             if (Volatile.Read(ref _exceptions) != null)
779             {
780                 // It's ok to read _exceptions' content here, because
781                 // at this point no more exceptions can be generated and thus no one will
782                 // be writing to it.
783                 _completionSource.TrySetException(Volatile.Read(ref _exceptions));
784             }
785             // If we completed with cancellation, finish in a canceled state
786             else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
787             {
788                 _completionSource.TrySetCanceled();
789             }
790             // Otherwise, finish in a successful state.
791             else
792             {
793                 _completionSource.TrySetResult(default(VoidResult));
794             }
795 #if FEATURE_TRACING
796             // We only want to do tracing for block completion if this target core represents the whole block.
797             // If it only represents a part of the block (i.e. there's a source associated with it as well),
798             // then we shouldn't log just for the first half of the block; the source half will handle logging.
799             DataflowEtwProvider etwLog;
800             if ((_targetCoreOptions & TargetCoreOptions.RepresentsBlockCompletion) != 0 &&
801                 (etwLog = DataflowEtwProvider.Log).IsEnabled())
802             {
803                 etwLog.DataflowBlockCompleted(_owningTarget);
804             }
805 #endif
806         }
807
808         /// <summary>Gets whether the target core is operating in a bounded mode.</summary>
809         internal bool IsBounded { get { return _boundingState != null; } }
810
811         /// <summary>Increases or decreases the bounding count.</summary>
812         /// <param name="count">The incremental addition (positive to increase, negative to decrease).</param>
813         internal void ChangeBoundingCount(int count)
814         {
815             Contract.Requires(count != 0, "Should only be called when the count is actually changing.");
816             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
817             if (_boundingState != null)
818             {
819                 lock (IncomingLock)
820                 {
821                     Debug.Assert(count > 0 || (count < 0 && _boundingState.CurrentCount + count >= 0),
822                         "If count is negative, it must not take the total count negative.");
823                     _boundingState.CurrentCount += count;
824                     ProcessAsyncIfNecessary();
825                     CompleteBlockIfPossible();
826                 }
827             }
828         }
829
830         /// <summary>Gets the object to display in the debugger display attribute.</summary>
831         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
832         private object DebuggerDisplayContent
833         {
834             get
835             {
836                 var displayTarget = _owningTarget as IDebuggerDisplay;
837                 return string.Format("Block=\"{0}\"",
838                     displayTarget != null ? displayTarget.Content : _owningTarget);
839             }
840         }
841
842         /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
843         internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
844
845         /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
846         /// <returns>Debugging information about this target.</returns>
847         internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
848
849         /// <summary>Provides a wrapper for commonly needed debugging information.</summary>
850         internal sealed class DebuggingInformation
851         {
852             /// <summary>The target being viewed.</summary>
853             private readonly TargetCore<TInput> _target;
854
855             /// <summary>Initializes the debugging helper.</summary>
856             /// <param name="target">The target being viewed.</param>
857             internal DebuggingInformation(TargetCore<TInput> target) { _target = target; }
858
859             /// <summary>Gets the number of messages waiting to be processed.</summary>
860             internal int InputCount { get { return _target._messages.Count; } }
861             /// <summary>Gets the messages waiting to be processed.</summary>
862             internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(kvp => kvp.Key).ToList(); } }
863
864             /// <summary>Gets any postponed messages.</summary>
865             internal QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
866             {
867                 get { return _target._boundingState != null ? _target._boundingState.PostponedMessages : null; }
868             }
869
870             /// <summary>Gets the current number of outstanding input processing operations.</summary>
871             internal Int32 CurrentDegreeOfParallelism { get { return _target._numberOfOutstandingOperations - _target._numberOfOutstandingServiceTasks; } }
872
873             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
874             internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
875             /// <summary>Gets whether the block is declining further messages.</summary>
876             internal bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
877             /// <summary>Gets whether the block is completed.</summary>
878             internal bool IsCompleted { get { return _target.Completion.IsCompleted; } }
879         }
880     }
881 }