Merge pull request #2247 from ivmai/match-ext-libgc-api
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / SourceCore.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // SourceCore.cs
7 //
8 //
9 // The core implementation of a standard ISourceBlock<TOutput>.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Linq;
18 using System.Security;
19
20 namespace System.Threading.Tasks.Dataflow.Internal
21 {
22     // LOCK-LEVELING SCHEME
23     // --------------------
24     // SourceCore employs two locks: OutgoingLock and ValueLock.  Additionally, targets we call out to
25     // likely utilize their own IncomingLock.  We can hold OutgoingLock while acquiring ValueLock or IncomingLock.
26     // However, we cannot hold ValueLock while calling out to external code or while acquiring OutgoingLock, and 
27     // we cannot hold IncomingLock when acquiring OutgoingLock. Additionally, the locks employed must be reentrant.
28
29     /// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
30     /// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
31     [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
32     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
33     internal sealed class SourceCore<TOutput>
34     {
35         // *** These fields are readonly and are initialized to new instances at construction.
36
37         /// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
38         private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
39         /// <summary>A registry used to store all linked targets and information about them.</summary>
40         private readonly TargetRegistry<TOutput> _targetRegistry;
41         /// <summary>The output messages queued up to be received by consumers/targets.</summary>
42         /// <remarks>
43         /// The queue is only ever accessed by a single producer and single consumer at a time.  On the producer side,
44         /// we require that AddMessage/AddMessages are the only places the queue is added to, and we require that those
45         /// methods not be used concurrently with anything else.  All of our target halves today follow that restriction;
46         /// for example, TransformBlock with DOP==1 will have at most a single task processing the user provided delegate,
47         /// and thus at most one task calling AddMessage.  If it has a DOP > 1, it'll go through the ReorderingBuffer,
48         /// which will use a lock to synchronize the output of all of the processing tasks such that only one is using
49         /// AddMessage at a time.  On the consumer side of SourceCore, all consumption is protected by ValueLock, and thus
50         /// all consumption is serialized.
51         /// </remarks>
52         private readonly SingleProducerSingleConsumerQueue<TOutput> _messages = new SingleProducerSingleConsumerQueue<TOutput>(); // protected by AddMessage/ValueLock
53
54         /// <summary>Gets the object to use as the outgoing lock.</summary>
55         private object OutgoingLock { get { return _completionTask; } }
56         /// <summary>Gets the object to use as the value lock.</summary>
57         private object ValueLock { get { return _targetRegistry; } }
58
59         // *** These fields are readonly and are initialized by arguments to the constructor.
60
61         /// <summary>The source utilizing this helper.</summary>
62         private readonly ISourceBlock<TOutput> _owningSource;
63         /// <summary>The options used to configure this block's execution.</summary>
64         private readonly DataflowBlockOptions _dataflowBlockOptions;
65         /// <summary>
66         /// An action to be invoked on the owner block to stop accepting messages.
67         /// This action is invoked when SourceCore encounters an exception.
68         /// </summary>
69         private readonly Action<ISourceBlock<TOutput>> _completeAction;
70         /// <summary>
71         /// An action to be invoked on the owner block when an item is removed.
72         /// This may be null if the owner block doesn't need to be notified.
73         /// </summary>
74         private readonly Action<ISourceBlock<TOutput>, int> _itemsRemovedAction;
75         /// <summary>Item counting function</summary>
76         private readonly Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, int> _itemCountingFunc;
77
78         // *** These fields are mutated during execution.
79
80         /// <summary>The task used to process the output and offer it to targets.</summary>
81         private Task _taskForOutputProcessing; // protected by ValueLock
82         /// <summary>Counter for message IDs unique within this source block.</summary>
83         private PaddedInt64 _nextMessageId = new PaddedInt64 { Value = 1 }; // We are going to use this value before incrementing.  Protected by ValueLock.
84         /// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
85         private ITargetBlock<TOutput> _nextMessageReservedFor; // protected by OutgoingLock
86         /// <summary>Whether all future messages should be declined.</summary>
87         private bool _decliningPermanently; // Protected by ValueLock
88         /// <summary>Whether this block should again attempt to offer messages to targets.</summary>
89         private bool _enableOffering = true; // Protected by ValueLock, sometimes read with volatile reads
90         /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
91         private bool _completionReserved; // Protected by OutgoingLock
92         /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
93         private List<Exception> _exceptions; // Protected by ValueLock, sometimes read with volatile reads
94
95         /// <summary>Initializes the source core.</summary>
96         /// <param name="owningSource">The source utilizing this core.</param>
97         /// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
98         /// <param name="completeAction">Action to invoke in order to decline the associated target half, which will in turn decline this source core.</param>
99         /// <param name="itemsRemovedAction">Action to invoke when one or more items is removed.  This may be null.</param>
100         /// <param name="itemCountingFunc">
101         /// Action to invoke when the owner needs to be able to count the number of individual
102         /// items in an output or set of outputs.
103         /// </param>
104         internal SourceCore(
105             ISourceBlock<TOutput> owningSource, DataflowBlockOptions dataflowBlockOptions,
106             Action<ISourceBlock<TOutput>> completeAction,
107             Action<ISourceBlock<TOutput>, int> itemsRemovedAction = null,
108             Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, int> itemCountingFunc = null)
109         {
110             Contract.Requires(owningSource != null, "Core must be associated with a source.");
111             Contract.Requires(dataflowBlockOptions != null, "Options must be provided to configure the core.");
112             Contract.Requires(completeAction != null, "Action to invoke on completion is required.");
113
114             // Store the args
115             _owningSource = owningSource;
116             _dataflowBlockOptions = dataflowBlockOptions;
117             _itemsRemovedAction = itemsRemovedAction;
118             _itemCountingFunc = itemCountingFunc;
119             _completeAction = completeAction;
120
121             // Construct members that depend on the args
122             _targetRegistry = new TargetRegistry<TOutput>(_owningSource);
123         }
124
125         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
126         [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
127         internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
128         {
129             // Validate arguments
130             if (target == null) throw new ArgumentNullException("target");
131             if (linkOptions == null) throw new ArgumentNullException("linkOptions");
132             Contract.EndContractBlock();
133
134             // If the block is already completed, there is not much to do -
135             // we have to propagate completion if that was requested, and
136             // then bail without taking the lock.
137             if (_completionTask.Task.IsCompleted)
138             {
139                 if (linkOptions.PropagateCompletion) Common.PropagateCompletion(_completionTask.Task, target, exceptionHandler: null);
140                 return Disposables.Nop;
141             }
142
143             lock (OutgoingLock)
144             {
145                 // If completion has been reserved, the target registry has either been cleared already
146                 // or is about to be cleared. So we can link and offer only if completion is not reserved. 
147                 if (!_completionReserved)
148                 {
149                     _targetRegistry.Add(ref target, linkOptions);
150                     OfferToTargets(linkToTarget: target);
151                     return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
152                 }
153             }
154
155             // The block should not offer any messages when it is in this state, but
156             // it should still propagate completion if that has been requested.
157             if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
158             return Disposables.Nop;
159         }
160
161         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
162         internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
163         {
164             // Validate arguments
165             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
166             if (target == null) throw new ArgumentNullException("target");
167             Contract.EndContractBlock();
168
169             TOutput consumedMessageValue = default(TOutput);
170
171             lock (OutgoingLock)
172             {
173                 // If this target doesn't hold the reservation, then for this ConsumeMessage
174                 // to be valid, there must not be any reservation (since otherwise we can't 
175                 // consume a message destined for someone else).
176                 if (_nextMessageReservedFor != target &&
177                     _nextMessageReservedFor != null)
178                 {
179                     messageConsumed = false;
180                     return default(TOutput);
181                 }
182
183                 lock (ValueLock)
184                 {
185                     // If the requested message isn't the next message to be served up, bail.
186                     // Otherwise, we're good to go: dequeue the message as it will now be owned by the target,
187                     // signal that we can resume enabling offering as there's potentially a new "next message",
188                     // complete if necessary, and offer asynchronously all messages as is appropriate.
189
190                     if (messageHeader.Id != _nextMessageId.Value ||
191                         !_messages.TryDequeue(out consumedMessageValue))
192                     {
193                         messageConsumed = false;
194                         return default(TOutput);
195                     }
196
197                     _nextMessageReservedFor = null;
198                     _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
199                     _enableOffering = true; // reenable offering if it was disabled
200                     _nextMessageId.Value++;
201                     CompleteBlockIfPossible();
202                     OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
203                 }
204             }
205
206             // Notify the owner block that our count has decreased
207             if (_itemsRemovedAction != null)
208             {
209                 int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, consumedMessageValue, null) : 1;
210                 _itemsRemovedAction(_owningSource, count);
211             }
212
213             // Return the consumed message value
214             messageConsumed = true;
215             return consumedMessageValue;
216         }
217
218         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
219         internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
220         {
221             // Validate arguments
222             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
223             if (target == null) throw new ArgumentNullException("target");
224             Contract.EndContractBlock();
225
226             lock (OutgoingLock)
227             {
228                 // If no one currently holds a reservation...
229                 if (_nextMessageReservedFor == null)
230                 {
231                     lock (ValueLock)
232                     {
233                         // ...and if the requested message is next in the queue, allow it
234                         if (messageHeader.Id == _nextMessageId.Value && !_messages.IsEmpty)
235                         {
236                             _nextMessageReservedFor = target;
237                             _enableOffering = false;
238                             return true;
239                         }
240                     }
241                 }
242             }
243             return false;
244         }
245
246         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
247         internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
248         {
249             // Validate arguments
250             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
251             if (target == null) throw new ArgumentNullException("target");
252             Contract.EndContractBlock();
253
254             lock (OutgoingLock)
255             {
256                 // If someone else holds the reservation, bail.
257                 if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
258
259                 lock (ValueLock)
260                 {
261                     // If this is not the message at the head of the queue, bail
262                     if (messageHeader.Id != _nextMessageId.Value || _messages.IsEmpty) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
263
264                     // Otherwise, release the reservation
265                     _nextMessageReservedFor = null;
266                     Debug.Assert(!_enableOffering, "Offering should have been disabled if there was a valid reservation");
267                     _enableOffering = true;
268
269                     // Now there is at least one message ready for offering. So offer it.
270                     // If a cancellation is pending, this method will bail out.
271                     OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
272
273                     // This reservation may be holding the block's completion. So try to complete.
274                     CompleteBlockIfPossible();
275                 }
276             }
277         }
278
279         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
280         internal Task Completion { get { return _completionTask.Task; } }
281
282         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
283         internal Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
284         {
285             item = default(TOutput);
286             bool itemReceived = false;
287
288             lock (OutgoingLock)
289             {
290                 // If the next message is reserved for someone, we can't receive right now.  Otherwise...
291                 if (_nextMessageReservedFor == null)
292                 {
293                     lock (ValueLock)
294                     {
295                         // If there's at least one message, and there's no filter or the next item
296                         // passes the filter, dequeue it to be returned.
297                         if (_messages.TryDequeueIf(filter, out item))
298                         {
299                             _nextMessageId.Value++;
300
301                             // Now that the next message has changed, reenable offering if it was disabled
302                             _enableOffering = true;
303
304                             // If removing this item was the last thing this block will ever do, complete it,
305                             CompleteBlockIfPossible();
306
307                             // Now, try to offer up messages asynchronously, since we've
308                             // changed what's at the head of the queue
309                             OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
310
311                             itemReceived = true;
312                         }
313                     }
314                 }
315             }
316
317             if (itemReceived)
318             {
319                 // Notify the owner block that our count has decreased
320                 if (_itemsRemovedAction != null)
321                 {
322                     int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, item, null) : 1;
323                     _itemsRemovedAction(_owningSource, count);
324                 }
325             }
326             return itemReceived;
327         }
328
329         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
330         internal bool TryReceiveAll(out IList<TOutput> items)
331         {
332             items = null;
333             int countReceived = 0;
334
335             lock (OutgoingLock)
336             {
337                 // If the next message is reserved for someone, we can't receive right now.  Otherwise...
338                 if (_nextMessageReservedFor == null)
339                 {
340                     lock (ValueLock)
341                     {
342                         if (!_messages.IsEmpty)
343                         {
344                             // Receive all of the data, clearing it out in the process.
345                             var tmpList = new List<TOutput>();
346                             TOutput item;
347                             while (_messages.TryDequeue(out item)) tmpList.Add(item);
348                             countReceived = tmpList.Count;
349                             items = tmpList;
350
351                             // Increment the next ID. Any new value is good.
352                             _nextMessageId.Value++;
353
354                             // Now that the next message has changed, reenable offering if it was disabled
355                             _enableOffering = true;
356
357                             // Now that the block is empty, check to see whether we should complete.
358                             CompleteBlockIfPossible();
359                         }
360                     }
361                 }
362             }
363
364             if (countReceived > 0)
365             {
366                 // Notify the owner block that our count has decreased
367                 if (_itemsRemovedAction != null)
368                 {
369                     int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, default(TOutput), items) : countReceived;
370                     _itemsRemovedAction(_owningSource, count);
371                 }
372                 return true;
373             }
374             else return false;
375         }
376
377         /// <summary>Gets the number of items available to be received from this block.</summary>
378         internal int OutputCount { get { lock (OutgoingLock) lock (ValueLock) return _messages.Count; } }
379
380         /// <summary>
381         /// Adds a message to the source block for propagation. 
382         /// This method must only be used by one thread at a time, and must not be used concurrently
383         /// with any other producer side methods, e.g. AddMessages, Complete.
384         /// </summary>
385         /// <param name="item">The item to be wrapped in a message to be added.</param>
386         internal void AddMessage(TOutput item)
387         {
388             // This method must not take the OutgoingLock, as it will likely be called in situations
389             // where an IncomingLock is held.
390
391             if (_decliningPermanently) return;
392             _messages.Enqueue(item);
393
394             Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
395
396             if (_taskForOutputProcessing == null)
397             {
398                 // Separated out to enable inlining of AddMessage
399                 OfferAsyncIfNecessaryWithValueLock();
400             }
401         }
402
403         /// <summary>
404         /// Adds messages to the source block for propagation. 
405         /// This method must only be used by one thread at a time, and must not be used concurrently
406         /// with any other producer side methods, e.g. AddMessage, Complete.
407         /// </summary>
408         /// <param name="items">The list of items to be wrapped in messages to be added.</param>
409         internal void AddMessages(IEnumerable<TOutput> items)
410         {
411             Contract.Requires(items != null, "Items list must be valid.");
412
413             // This method must not take the OutgoingLock, as it will likely be called in situations
414             // where an IncomingLock is held.
415
416             if (_decliningPermanently) return;
417
418             // Special case arrays and lists, for which we can avoid the 
419             // enumerator allocation that'll result from using a foreach.
420             // This also avoids virtual method calls that we'd get if we
421             // didn't special case.
422             var itemsAsList = items as List<TOutput>;
423             if (itemsAsList != null)
424             {
425                 for (int i = 0; i < itemsAsList.Count; i++)
426                 {
427                     _messages.Enqueue(itemsAsList[i]);
428                 }
429             }
430             else
431             {
432                 TOutput[] itemsAsArray = items as TOutput[];
433                 if (itemsAsArray != null)
434                 {
435                     for (int i = 0; i < itemsAsArray.Length; i++)
436                     {
437                         _messages.Enqueue(itemsAsArray[i]);
438                     }
439                 }
440                 else
441                 {
442                     foreach (TOutput item in items)
443                     {
444                         _messages.Enqueue(item);
445                     }
446                 }
447             }
448
449             Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
450
451             if (_taskForOutputProcessing == null)
452             {
453                 OfferAsyncIfNecessaryWithValueLock();
454             }
455         }
456
457         /// <summary>Adds an individual exceptionto this source.</summary>
458         /// <param name="exception">The exception to add</param>
459         internal void AddException(Exception exception)
460         {
461             Contract.Requires(exception != null, "Valid exception must be provided to be added.");
462             Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
463             lock (ValueLock)
464             {
465                 Common.AddException(ref _exceptions, exception);
466             }
467         }
468
469         /// <summary>Adds exceptions to this source.</summary>
470         /// <param name="exceptions">The exceptions to add</param>
471         internal void AddExceptions(List<Exception> exceptions)
472         {
473             Contract.Requires(exceptions != null, "Valid exceptions must be provided to be added.");
474             Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
475             lock (ValueLock)
476             {
477                 foreach (Exception exception in exceptions)
478                 {
479                     Common.AddException(ref _exceptions, exception);
480                 }
481             }
482         }
483
484         /// <summary>Adds the exceptions contained in an AggregateException to this source.</summary>
485         /// <param name="aggregateException">The exception to add</param>
486         internal void AddAndUnwrapAggregateException(AggregateException aggregateException)
487         {
488             Contract.Requires(aggregateException != null && aggregateException.InnerExceptions.Count > 0, "Aggregate must be valid and contain inner exceptions to unwrap.");
489             Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
490             lock (ValueLock)
491             {
492                 Common.AddException(ref _exceptions, aggregateException, unwrapInnerExceptions: true);
493             }
494         }
495
496         /// <summary>Gets whether the _exceptions list is non-null.</summary>
497         internal bool HasExceptions
498         {
499             get
500             {
501                 // We may check whether _exceptions is null without taking a lock because it is volatile
502                 return Volatile.Read(ref _exceptions) != null;
503             }
504         }
505
506         /// <summary>Informs the block that it will not be receiving additional messages.</summary>
507         internal void Complete()
508         {
509             lock (ValueLock)
510             {
511                 _decliningPermanently = true;
512
513                 // CompleteAdding may be called in a context where an incoming lock is held.  We need to 
514                 // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
515                 // However, we know that _decliningPermanently has been set, and thus the timing of
516                 // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
517                 // and take the necessary locks in a situation where we're sure it won't cause a problem.
518                 Task.Factory.StartNew(state =>
519                 {
520                     var thisSourceCore = (SourceCore<TOutput>)state;
521                     lock (thisSourceCore.OutgoingLock)
522                     {
523                         lock (thisSourceCore.ValueLock)
524                         {
525                             thisSourceCore.CompleteBlockIfPossible();
526                         }
527                     }
528                 }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
529             }
530         }
531
532         /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
533         internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
534
535         /// <summary>Offers messages to all targets.</summary>
536         /// <param name="linkToTarget">
537         /// The newly linked target, if OfferToTargets is being called to synchronously
538         /// propagate to a target during a LinkTo operation.
539         /// </param>
540         private bool OfferToTargets(ITargetBlock<TOutput> linkToTarget = null)
541         {
542             Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
543             Common.ContractAssertMonitorStatus(ValueLock, held: false);
544
545             // If the next message is reserved, we can't offer anything
546             if (_nextMessageReservedFor != null)
547                 return false;
548
549             // Peek at the next message if there is one, so we can offer it.
550             DataflowMessageHeader header = default(DataflowMessageHeader);
551             TOutput message = default(TOutput);
552             bool offerJustToLinkToTarget = false;
553
554             // If offering isn't enabled and if we're not doing this as 
555             // a result of LinkTo, bail. Otherwise, with offering disabled, we must have 
556             // already offered this message to all existing targets, so we can just offer 
557             // it to the newly linked target.
558             if (!Volatile.Read(ref _enableOffering))
559             {
560                 if (linkToTarget == null) return false;
561                 else offerJustToLinkToTarget = true;
562             }
563
564             // Otherwise, peek at message to offer
565             if (_messages.TryPeek(out message))
566             {
567                 header = new DataflowMessageHeader(_nextMessageId.Value);
568             }
569
570             // If there is a message, offer it.
571             bool messageWasAccepted = false;
572             if (header.IsValid)
573             {
574                 if (offerJustToLinkToTarget)
575                 {
576                     // If we've already offered the message to everyone else,
577                     // we can just offer it to the newly linked target
578                     Debug.Assert(linkToTarget != null, "Must have a valid target to offer to.");
579                     OfferMessageToTarget(header, message, linkToTarget, out messageWasAccepted);
580                 }
581                 else
582                 {
583                     // Otherwise, we've not yet offered this message to anyone, so even 
584                     // if linkToTarget is non-null, we need to propagate the message in order
585                     // through all of the registered targets, the last of which will be the linkToTarget
586                     // if it's non-null (no need to special-case it, though).
587
588                     // Note that during OfferMessageToTarget, a target may call ConsumeMessage (taking advantage of the
589                     // reentrancy of OutgoingLock), which may unlink the target if the target is registered as "unlinkAfterOne".  
590                     // Doing so will remove the target from the targets list. As such, we maintain the next node
591                     // separately from cur.Next, in case cur.Next changes by cur being removed from the list.
592                     // No other node in the list should change, as we're protected by OutgoingLock.
593
594                     TargetRegistry<TOutput>.LinkedTargetInfo cur = _targetRegistry.FirstTargetNode;
595                     while (cur != null)
596                     {
597                         TargetRegistry<TOutput>.LinkedTargetInfo next = cur.Next;
598                         if (OfferMessageToTarget(header, message, cur.Target, out messageWasAccepted)) break;
599                         cur = next;
600                     }
601
602                     // If none of the targets accepted the message, disable offering.
603                     if (!messageWasAccepted)
604                     {
605                         lock (ValueLock)
606                         {
607                             _enableOffering = false;
608                         }
609                     }
610                 }
611             }
612
613             // If a message got accepted, consume it and reenable offering.
614             if (messageWasAccepted)
615             {
616                 lock (ValueLock)
617                 {
618                     // SourceCore set consumeToAccept to false.  However, it's possible
619                     // that an incorrectly written target may ignore that parameter and synchronously consume
620                     // even though they weren't supposed to.  To recover from that, 
621                     // we'll only dequeue if the correct message is still at the head of the queue.
622                     // However, we'll assert so that we can at least catch this in our own debug builds.
623                     TOutput dropped;
624                     if (_nextMessageId.Value != header.Id ||
625                         !_messages.TryDequeue(out dropped)) // remove the next message
626                     {
627                         Debug.Assert(false, "The target did not follow the protocol.");
628                     }
629                     _nextMessageId.Value++;
630
631                     // The message was accepted, so there's now going to be a new next message.
632                     // If offering had been disabled, reenable it.
633                     _enableOffering = true;
634
635                     // Now that a message has been removed, we need to complete if possible or
636                     // or asynchronously offer if necessary.  However, if we're calling this as part of our
637                     // offering loop, we won't be able to do either, since by definition there's already
638                     // a processing task spun up (us) that would prevent these things.  So we only
639                     // do the checks if we're being called to link a new target rather than as part
640                     // of normal processing.
641                     if (linkToTarget != null)
642                     {
643                         CompleteBlockIfPossible();
644                         OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
645                     }
646                 }
647
648                 // Notify the owner block that our count has decreased
649                 if (_itemsRemovedAction != null)
650                 {
651                     int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, message, null) : 1;
652                     _itemsRemovedAction(_owningSource, count);
653                 }
654             }
655
656             return messageWasAccepted;
657         }
658
659         /// <summary>Offers the message to the target.</summary>
660         /// <param name="header">The header of the message to offer.</param>
661         /// <param name="message">The message being offered.</param>
662         /// <param name="target">The single target to which the message should be offered.</param>
663         /// <param name="messageWasAccepted">true if the message was accepted by the target; otherwise, false.</param>
664         /// <returns>
665         /// true if the message should not be offered to additional targets; 
666         /// false if propagation should be allowed to continue.
667         /// </returns>
668         private bool OfferMessageToTarget(
669             DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target,
670             out bool messageWasAccepted)
671         {
672             Contract.Requires(target != null, "Valid target to offer to is required.");
673             Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
674             Common.ContractAssertMonitorStatus(ValueLock, held: false);
675
676             DataflowMessageStatus result = target.OfferMessage(header, message, _owningSource, consumeToAccept: false);
677             Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages are not being offered concurrently, so nothing should be missed.");
678             messageWasAccepted = false;
679
680             // If accepted, note it, and if the target was linked as "once", remove it
681             if (result == DataflowMessageStatus.Accepted)
682             {
683                 _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
684                 messageWasAccepted = true;
685                 return true; // the message should not be offered to anyone else
686             }
687             // If declined permanently, remove the target
688             else if (result == DataflowMessageStatus.DecliningPermanently)
689             {
690                 _targetRegistry.Remove(target);
691             }
692             // If the message was reserved by the target, stop propagating
693             else if (_nextMessageReservedFor != null)
694             {
695                 Debug.Assert(result == DataflowMessageStatus.Postponed,
696                     "If the message was reserved, it should also have been postponed.");
697                 return true; // the message should not be offered to anyone else
698             }
699             // If the result was Declined, there's nothing more to be done.
700             // This message will sit at the front of the queue until someone claims it.
701
702             return false; // allow the message to be offered to someone else
703         }
704
705         /// <summary>
706         /// Called when we want to enable asynchronously offering message to targets.
707         /// Takes the ValueLock before delegating to OfferAsyncIfNecessary.
708         /// </summary>
709         private void OfferAsyncIfNecessaryWithValueLock()
710         {
711             lock (ValueLock)
712             {
713                 OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: false);
714             }
715         }
716
717         /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
718         /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
719         /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
720         private void OfferAsyncIfNecessary(bool isReplacementReplica, bool outgoingLockKnownAcquired)
721         {
722             Common.ContractAssertMonitorStatus(ValueLock, held: true);
723
724             // Fast path to enable OfferAsyncIfNecessary to be inlined.  We only need
725             // to proceed if there's no task processing, offering is enabled, and
726             // there are no messages to be processed.
727             if (_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty)
728             {
729                 // Slow path: do additional checks and potentially launch new task
730                 OfferAsyncIfNecessary_Slow(isReplacementReplica, outgoingLockKnownAcquired);
731             }
732         }
733
734         /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
735         /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
736         /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
737         private void OfferAsyncIfNecessary_Slow(bool isReplacementReplica, bool outgoingLockKnownAcquired)
738         {
739             Common.ContractAssertMonitorStatus(ValueLock, held: true);
740             Debug.Assert(_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty,
741                 "The block must be enabled for offering, not currently be processing, and have messages available to process.");
742
743             // This method must not take the outgoing lock, as it will likely be called in situations
744             // where a derived type's incoming lock is held.
745
746             bool targetsAvailable = true;
747             if (outgoingLockKnownAcquired || Monitor.IsEntered(OutgoingLock))
748             {
749                 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
750                 targetsAvailable = _targetRegistry.FirstTargetNode != null;
751             }
752
753             // If there's any work to be done...
754             if (targetsAvailable && !CanceledOrFaulted)
755             {
756                 // Create task and store into _taskForOutputProcessing prior to scheduling the task
757                 // so that _taskForOutputProcessing will be visibly set in the task loop.
758                 _taskForOutputProcessing = new Task(thisSourceCore => ((SourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore(), this,
759                                                      Common.GetCreationOptionsForTask(isReplacementReplica));
760
761 #if FEATURE_TRACING
762                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
763                 if (etwLog.IsEnabled())
764                 {
765                     etwLog.TaskLaunchedForMessageHandling(
766                         _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
767                 }
768 #endif
769
770                 // Start the task handling scheduling exceptions
771 #pragma warning disable 0420
772                 Exception exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
773 #pragma warning restore 0420
774                 if (exception != null)
775                 {
776                     // First, log the exception while the processing state is dirty which is preventing the block from completing.
777                     // Then revert the proactive processing state changes.
778                     // And last, try to complete the block.
779                     AddException(exception);
780                     _taskForOutputProcessing = null;
781                     _decliningPermanently = true;
782
783                     // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
784                     // Re-take the locks on a separate thread.
785                     Task.Factory.StartNew(state =>
786                     {
787                         var thisSourceCore = (SourceCore<TOutput>)state;
788                         lock (thisSourceCore.OutgoingLock)
789                         {
790                             lock (thisSourceCore.ValueLock)
791                             {
792                                 thisSourceCore.CompleteBlockIfPossible();
793                             }
794                         }
795                     }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
796                 }
797                 if (exception != null) AddException(exception);
798             }
799         }
800
801         /// <summary>Task body used to process messages.</summary>
802         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
803         private void OfferMessagesLoopCore()
804         {
805             Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
806                 "Must be part of the current processing task.");
807             try
808             {
809                 int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
810
811                 // We need to hold the outgoing lock while offering messages.  We can either
812                 // lock and unlock for each individual offering, or we can lock around multiple or all
813                 // possible offerings.  The former ensures that other operations don't get starved,
814                 // while the latter is much more efficient (not continually acquiring and releasing
815                 // the lock).  For blocks that aren't linked to any targets, this won't matter
816                 // (no offering is done), and for blocks that are only linked to targets, this shouldn't 
817                 // matter (no one is contending for the lock), thus
818                 // the only case it would matter is when a block both has targets and is being
819                 // explicitly received from, which is an uncommon scenario.  Thus, we want to lock
820                 // around the whole thing to improve performance, but just in case we do hit
821                 // an uncommon scenario, in the default case we release the lock every now and again.  
822                 // If a developer wants to control this, they can limit the duration of the 
823                 // lock by using MaxMessagesPerTask.
824
825                 const int DEFAULT_RELEASE_LOCK_ITERATIONS = 10; // Dialable
826                 int releaseLockIterations =
827                     _dataflowBlockOptions.MaxMessagesPerTask == DataflowBlockOptions.Unbounded ?
828                         DEFAULT_RELEASE_LOCK_ITERATIONS : maxMessagesPerTask;
829
830                 for (int messageCounter = 0;
831                     messageCounter < maxMessagesPerTask && !CanceledOrFaulted;)
832                 {
833                     lock (OutgoingLock)
834                     {
835                         // While there are more messages to process, offer each in turn
836                         // to the targets.  If we're unable to propagate a particular message,
837                         // stop trying until something changes in the future.
838                         for (
839                             int lockReleaseCounter = 0;
840                             messageCounter < maxMessagesPerTask && lockReleaseCounter < releaseLockIterations && !CanceledOrFaulted;
841                             ++messageCounter, ++lockReleaseCounter)
842                         {
843                             if (!OfferToTargets()) return;
844                         }
845                     }
846                 }
847             }
848             catch (Exception exc)
849             {
850                 // Record the exception
851                 AddException(exc);
852
853                 // Notify the owning block it should stop accepting new messages
854                 _completeAction(_owningSource);
855             }
856             finally
857             {
858                 lock (OutgoingLock)
859                 {
860                     lock (ValueLock)
861                     {
862                         // We're no longer processing, so null out the processing task
863                         Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
864                             "Must be part of the current processing task.");
865                         _taskForOutputProcessing = null;
866                         Interlocked.MemoryBarrier(); // synchronize with AddMessage(s) and its read of _taskForOutputProcessing
867
868                         // However, we may have given up early because we hit our own configured
869                         // processing limits rather than because we ran out of work to do.  If that's
870                         // the case, make sure we spin up another task to keep going.
871                         OfferAsyncIfNecessary(isReplacementReplica: true, outgoingLockKnownAcquired: true);
872
873                         // If, however, we stopped because we ran out of work to do and we
874                         // know we'll never get more, then complete.
875                         CompleteBlockIfPossible();
876                     }
877                 }
878             }
879         }
880
881         /// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
882         private bool CanceledOrFaulted
883         {
884             get
885             {
886                 // Cancellation is honored as soon as the CancellationToken has been signaled.
887                 // Faulting is honored after an exception has been encountered and the owning block
888                 // has invoked Complete on us.
889                 return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
890                     (HasExceptions && _decliningPermanently);
891             }
892         }
893
894         /// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
895         private void CompleteBlockIfPossible()
896         {
897             Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
898             Common.ContractAssertMonitorStatus(ValueLock, held: true);
899
900             if (!_completionReserved)
901             {
902                 if (_decliningPermanently && // declining permanently, so no more messages will arrive
903                     _taskForOutputProcessing == null && // no current processing
904                     _nextMessageReservedFor == null) // no pending reservation
905                 {
906                     CompleteBlockIfPossible_Slow();
907                 }
908             }
909         }
910
911         /// <summary>
912         /// Slow path for CompleteBlockIfPossible. 
913         /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
914         /// </summary>
915         private void CompleteBlockIfPossible_Slow()
916         {
917             Contract.Requires(
918                 _decliningPermanently && _taskForOutputProcessing == null && _nextMessageReservedFor == null,
919                 "The block must be declining permanently, there must be no reservations, and there must be no processing tasks");
920             Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
921             Common.ContractAssertMonitorStatus(ValueLock, held: true);
922
923             if (_messages.IsEmpty || CanceledOrFaulted)
924             {
925                 _completionReserved = true;
926
927                 // Get out from under currently held locks.  This is to avoid
928                 // invoking synchronous continuations off of _completionTask.Task
929                 // while holding a lock.
930                 Task.Factory.StartNew(state => ((SourceCore<TOutput>)state).CompleteBlockOncePossible(),
931                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
932             }
933         }
934
935         /// <summary>
936         /// Completes the block.  This must only be called once, and only once all of the completion conditions are met.
937         /// As such, it must only be called from CompleteBlockIfPossible.
938         /// </summary>
939         private void CompleteBlockOncePossible()
940         {
941             TargetRegistry<TOutput>.LinkedTargetInfo linkedTargets;
942             List<Exception> exceptions;
943
944             // Avoid completing while the code that caused this completion to occur is still holding a lock.
945             // Clear out the target registry and buffers to help avoid memory leaks.
946             lock (OutgoingLock)
947             {
948                 // Save the linked list of targets so that it could be traversed later to propagate completion
949                 linkedTargets = _targetRegistry.ClearEntryPoints();
950                 lock (ValueLock)
951                 {
952                     _messages.Clear();
953
954                     // Save a local reference to the exceptions list and null out the field,
955                     // so that if the target side tries to add an exception this late,
956                     // it will go to a separate list (that will be ignored.)
957                     exceptions = _exceptions;
958                     _exceptions = null;
959                 }
960             }
961
962             // If it's due to an unhandled exception, finish in an error state
963             if (exceptions != null)
964             {
965                 _completionTask.TrySetException(exceptions);
966             }
967             // If it's due to cancellation, finish in a canceled state
968             else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
969             {
970                 _completionTask.TrySetCanceled();
971             }
972             // Otherwise, finish in a successful state.
973             else
974             {
975                 _completionTask.TrySetResult(default(VoidResult));
976             }
977
978             // Now that the completion task is completed, we may propagate completion to the linked targets
979             _targetRegistry.PropagateCompletion(linkedTargets);
980 #if FEATURE_TRACING
981             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
982             if (etwLog.IsEnabled())
983             {
984                 etwLog.DataflowBlockCompleted(_owningSource);
985             }
986 #endif
987         }
988
989         /// <summary>Gets the object to display in the debugger display attribute.</summary>
990         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
991         private object DebuggerDisplayContent
992         {
993             get
994             {
995                 var displaySource = _owningSource as IDebuggerDisplay;
996                 return string.Format("Block=\"{0}\"",
997                     displaySource != null ? displaySource.Content : _owningSource);
998             }
999         }
1000
1001         /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
1002         /// <returns>Debugging information about this source core.</returns>
1003         internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
1004
1005         /// <summary>Provides debugging information about the source core.</summary>
1006         internal sealed class DebuggingInformation
1007         {
1008             /// <summary>The source being viewed.</summary>
1009             private SourceCore<TOutput> _source;
1010
1011             /// <summary>Initializes the type proxy.</summary>
1012             /// <param name="source">The source being viewed.</param>
1013             internal DebuggingInformation(SourceCore<TOutput> source) { _source = source; }
1014
1015             /// <summary>Gets the number of messages available for receiving.</summary>
1016             internal int OutputCount { get { return _source._messages.Count; } }
1017             /// <summary>Gets the messages available for receiving.</summary>
1018             internal IEnumerable<TOutput> OutputQueue { get { return _source._messages.ToList(); } }
1019             /// <summary>Gets the task being used for output processing.</summary>
1020             internal Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }
1021
1022             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1023             internal DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }
1024             /// <summary>Gets whether the block is declining further messages.</summary>
1025             internal bool IsDecliningPermanently { get { return _source._decliningPermanently; } }
1026             /// <summary>Gets whether the block is completed.</summary>
1027             internal bool IsCompleted { get { return _source.Completion.IsCompleted; } }
1028
1029             /// <summary>Gets the set of all targets linked from this block.</summary>
1030             internal TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }
1031             /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
1032             internal ITargetBlock<TOutput> NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }
1033         }
1034     }
1035 }