Merge pull request #2223 from lobrien/master
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Blocks / BroadcastBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // BroadcastBlock.cs
7 //
8 //
9 // A propagator that broadcasts incoming messages to all targets, overwriting the current
10 // message in the process.
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
18 using System.Linq;
19 using System.Security;
20 using System.Threading.Tasks.Dataflow.Internal;
21
22 namespace System.Threading.Tasks.Dataflow
23 {
24     /// <summary>
25     /// Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.  
26     /// Messages are broadcast to all linked targets, all of which may consume a clone of the message.
27     /// </summary>
28     /// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
29     /// <remarks>
30     /// <see cref="BroadcastBlock{T}"/> exposes at most one element at a time.  However, unlike
31     /// <see cref="WriteOnceBlock{T}"/>, that element will be overwritten as new elements are provided
32     /// to the block.  <see cref="BroadcastBlock{T}"/> ensures that the current element is broadcast to any
33     /// linked targets before allowing the element to be overwritten.
34     /// </remarks>
35     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
36     [DebuggerTypeProxy(typeof(BroadcastBlock<>.DebugView))]
37     public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
38     {
39         /// <summary>The source side.</summary>
40         private readonly BroadcastingSourceCore<T> _source;
41         /// <summary>Bounding state for when the block is executing in bounded mode.</summary>
42         private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
43         /// <summary>Whether all future messages should be declined.</summary>
44         private bool _decliningPermanently;
45         /// <summary>A task has reserved the right to run the completion routine.</summary>
46         private bool _completionReserved;
47         /// <summary>Gets the lock used to synchronize incoming requests.</summary>
48         private object IncomingLock { get { return _source; } }
49
50         /// <summary>Initializes the <see cref="BroadcastBlock{T}"/> with the specified cloning function.</summary>
51         /// <param name="cloningFunction">
52         /// The function to use to clone the data when offered to other blocks.
53         /// This may be null to indicate that no cloning need be performed.
54         /// </param>
55         public BroadcastBlock(Func<T, T> cloningFunction) :
56             this(cloningFunction, DataflowBlockOptions.Default)
57         { }
58
59         /// <summary>Initializes the <see cref="BroadcastBlock{T}"/>  with the specified cloning function and <see cref="DataflowBlockOptions"/>.</summary>
60         /// <param name="cloningFunction">
61         /// The function to use to clone the data when offered to other blocks.
62         /// This may be null to indicate that no cloning need be performed.
63         /// </param>
64         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BroadcastBlock{T}"/>.</param>
65         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
66         public BroadcastBlock(Func<T, T> cloningFunction, DataflowBlockOptions dataflowBlockOptions)
67         {
68             // Validate arguments
69             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
70             Contract.EndContractBlock();
71
72             // Ensure we have options that can't be changed by the caller
73             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
74
75             // Initialize bounding state if necessary
76             Action<int> onItemsRemoved = null;
77             if (dataflowBlockOptions.BoundedCapacity > 0)
78             {
79                 Debug.Assert(dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor");
80                 onItemsRemoved = OnItemsRemoved;
81                 _boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
82             }
83
84             // Initialize the source side
85             _source = new BroadcastingSourceCore<T>(this, cloningFunction, dataflowBlockOptions, onItemsRemoved);
86
87             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
88             // In those cases we need to fault the target half to drop its buffered messages and to release its 
89             // reservations. This should not create an infinite loop, because all our implementations are designed
90             // to handle multiple completion requests and to carry over only one.
91             _source.Completion.ContinueWith((completed, state) =>
92             {
93                 var thisBlock = ((BroadcastBlock<T>)state) as IDataflowBlock;
94                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
95                 thisBlock.Fault(completed.Exception);
96             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
97
98             // Handle async cancellation requests by declining on the target
99             Common.WireCancellationToComplete(
100                 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BroadcastBlock<T>)state).Complete(), this);
101 #if FEATURE_TRACING
102             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
103             if (etwLog.IsEnabled())
104             {
105                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
106             }
107 #endif
108         }
109
110         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
111         public void Complete()
112         {
113             CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false);
114         }
115
116         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
117         void IDataflowBlock.Fault(Exception exception)
118         {
119             if (exception == null) throw new ArgumentNullException("exception");
120             Contract.EndContractBlock();
121
122             CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
123         }
124
125         internal void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
126         {
127             Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
128                             "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
129             Contract.EndContractBlock();
130
131             lock (IncomingLock)
132             {
133                 // Faulting from outside is allowed until we start declining permanently.
134                 // Faulting from inside is allowed at any time.
135                 if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting))
136                 {
137                     _source.AddException(exception);
138                 }
139
140                 // Revert the dirty processing state if requested
141                 if (revertProcessingState)
142                 {
143                     Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
144                                     "The processing state must be dirty when revertProcessingState==true.");
145                     _boundingState.TaskForInputProcessing = null;
146                 }
147
148                 // Trigger completion if possible
149                 _decliningPermanently = true;
150                 CompleteTargetIfPossible();
151             }
152         }
153
154         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
155         public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
156
157         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
158         public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
159
160         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
161         Boolean IReceivableSourceBlock<T>.TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
162
163         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
164         public Task Completion { get { return _source.Completion; } }
165
166         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
167         DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
168         {
169             // Validate arguments
170             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
171             if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
172             Contract.EndContractBlock();
173
174             lock (IncomingLock)
175             {
176                 // If we've already stopped accepting messages, decline permanently
177                 if (_decliningPermanently)
178                 {
179                     CompleteTargetIfPossible();
180                     return DataflowMessageStatus.DecliningPermanently;
181                 }
182
183                 // We can directly accept the message if:
184                 //      1) we are not bounding, OR 
185                 //      2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing. 
186                 // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
187                 // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
188                 // accepting new ones directly into the queue.)
189                 if (_boundingState == null
190                         ||
191                     (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
192                 {
193                     // Consume the message from the source if necessary
194                     if (consumeToAccept)
195                     {
196                         Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
197
198                         bool consumed;
199                         messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
200                         if (!consumed) return DataflowMessageStatus.NotAvailable;
201                     }
202
203                     // Once consumed, pass it to the delegate
204                     _source.AddMessage(messageValue);
205                     if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
206                     return DataflowMessageStatus.Accepted;
207                 }
208                 // Otherwise, we try to postpone if a source was provided
209                 else if (source != null)
210                 {
211                     Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
212                         "PostponedMessages must have been initialized during construction in bounding mode.");
213
214                     _boundingState.PostponedMessages.Push(source, messageHeader);
215                     return DataflowMessageStatus.Postponed;
216                 }
217                 // We can't do anything else about this message
218                 return DataflowMessageStatus.Declined;
219             }
220         }
221
222         /// <summary>Notifies the block that one or more items was removed from the queue.</summary>
223         /// <param name="numItemsRemoved">The number of items removed.</param>
224         private void OnItemsRemoved(int numItemsRemoved)
225         {
226             Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed.");
227             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
228
229             // If we're bounding, we need to know when an item is removed so that we
230             // can update the count that's mirroring the actual count in the source's queue,
231             // and potentially kick off processing to start consuming postponed messages.
232             if (_boundingState != null)
233             {
234                 lock (IncomingLock)
235                 {
236                     // Decrement the count, which mirrors the count in the source half
237                     Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
238                         "It should be impossible to have a negative number of items.");
239                     _boundingState.CurrentCount -= numItemsRemoved;
240
241                     ConsumeAsyncIfNecessary();
242                     CompleteTargetIfPossible();
243                 }
244             }
245         }
246
247         /// <summary>Called when postponed messages may need to be consumed.</summary>
248         /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
249         internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
250         {
251             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
252             Debug.Assert(_boundingState != null, "Must be in bounded mode.");
253
254             if (!_decliningPermanently &&
255                 _boundingState.TaskForInputProcessing == null &&
256                 _boundingState.PostponedMessages.Count > 0 &&
257                 _boundingState.CountIsLessThanBound)
258             {
259                 // Create task and store into _taskForInputProcessing prior to scheduling the task
260                 // so that _taskForInputProcessing will be visibly set in the task loop.
261                 _boundingState.TaskForInputProcessing =
262                     new Task(state => ((BroadcastBlock<T>)state).ConsumeMessagesLoopCore(), this,
263                         Common.GetCreationOptionsForTask(isReplacementReplica));
264
265 #if FEATURE_TRACING
266                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
267                 if (etwLog.IsEnabled())
268                 {
269                     etwLog.TaskLaunchedForMessageHandling(
270                         this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
271                         _boundingState.PostponedMessages.Count);
272                 }
273 #endif
274
275                 // Start the task handling scheduling exceptions
276                 Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
277                 if (exception != null)
278                 {
279                     // Get out from under currently held locks. Complete re-acquires the locks it needs.
280                     Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
281                                         exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
282                 }
283             }
284         }
285
286         /// <summary>Task body used to consume postponed messages.</summary>
287         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
288         private void ConsumeMessagesLoopCore()
289         {
290             Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
291                 "May only be called in bounded mode and when a task is in flight.");
292             Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
293                 "This must only be called from the in-flight processing task.");
294             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
295
296             try
297             {
298                 int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
299                 for (int i = 0;
300                     i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
301                     i++)
302                     ;
303             }
304             catch (Exception exception)
305             {
306                 // Prevent the creation of new processing tasks
307                 CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true);
308             }
309             finally
310             {
311                 lock (IncomingLock)
312                 {
313                     // We're no longer processing, so null out the processing task
314                     _boundingState.TaskForInputProcessing = null;
315
316                     // However, we may have given up early because we hit our own configured
317                     // processing limits rather than because we ran out of work to do.  If that's
318                     // the case, make sure we spin up another task to keep going.
319                     ConsumeAsyncIfNecessary(isReplacementReplica: true);
320
321                     // If, however, we stopped because we ran out of work to do and we
322                     // know we'll never get more, then complete.
323                     CompleteTargetIfPossible();
324                 }
325             }
326         }
327
328         /// <summary>
329         /// Retrieves one postponed message if there's room and if we can consume a postponed message.
330         /// Stores any consumed message into the source half.
331         /// </summary>
332         /// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
333         /// <remarks>This must only be called from the asynchronous processing loop.</remarks>
334         private bool ConsumeAndStoreOneMessageIfAvailable()
335         {
336             Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
337                 "May only be called in bounded mode and when a task is in flight.");
338             Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
339                 "This must only be called from the in-flight processing task.");
340             Common.ContractAssertMonitorStatus(IncomingLock, held: false);
341
342             // Loop through the postponed messages until we get one.
343             while (true)
344             {
345                 // Get the next item to retrieve.  If there are no more, bail.
346                 KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
347                 lock (IncomingLock)
348                 {
349                     if (!_boundingState.CountIsLessThanBound) return false;
350                     if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
351
352                     // Optimistically assume we're going to get the item. This avoids taking the lock
353                     // again if we're right.  If we're wrong, we decrement it later under lock.
354                     _boundingState.CurrentCount++;
355                 }
356
357                 // Consume the item
358                 bool consumed = false;
359                 try
360                 {
361                     T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
362                     if (consumed)
363                     {
364                         _source.AddMessage(consumedValue);
365                         return true;
366                     }
367                 }
368                 finally
369                 {
370                     // We didn't get the item, so decrement the count to counteract our optimistic assumption.
371                     if (!consumed)
372                     {
373                         lock (IncomingLock) _boundingState.CurrentCount--;
374                     }
375                 }
376             }
377         }
378
379         /// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
380         private void CompleteTargetIfPossible()
381         {
382             Common.ContractAssertMonitorStatus(IncomingLock, held: true);
383             if (_decliningPermanently &&
384                 !_completionReserved &&
385                 (_boundingState == null || _boundingState.TaskForInputProcessing == null))
386             {
387                 _completionReserved = true;
388
389                 // If we're in bounding mode and we have any postponed messages, we need to clear them,
390                 // which means calling back to the source, which means we need to escape the incoming lock.
391                 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
392                 {
393                     Task.Factory.StartNew(state =>
394                     {
395                         var thisBroadcastBlock = (BroadcastBlock<T>)state;
396
397                         // Release any postponed messages
398                         List<Exception> exceptions = null;
399                         if (thisBroadcastBlock._boundingState != null)
400                         {
401                             // Note: No locks should be held at this point
402                             Common.ReleaseAllPostponedMessages(thisBroadcastBlock,
403                                                                thisBroadcastBlock._boundingState.PostponedMessages,
404                                                                ref exceptions);
405                         }
406
407                         if (exceptions != null)
408                         {
409                             // It is important to migrate these exceptions to the source part of the owning batch,
410                             // because that is the completion task that is publically exposed.
411                             thisBroadcastBlock._source.AddExceptions(exceptions);
412                         }
413
414                         thisBroadcastBlock._source.Complete();
415                     }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
416                 }
417                 // Otherwise, we can just decline the source directly.
418                 else
419                 {
420                     _source.Complete();
421                 }
422             }
423         }
424
425         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
426         T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
427         {
428             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
429         }
430
431         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
432         bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
433         {
434             return _source.ReserveMessage(messageHeader, target);
435         }
436
437         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
438         void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
439         {
440             _source.ReleaseReservation(messageHeader, target);
441         }
442
443         /// <summary>Gets a value to be used for the DebuggerDisplayAttribute.  This must not throw even if HasValue is false.</summary>
444         private bool HasValueForDebugger { get { return _source.GetDebuggingInformation().HasValue; } }
445         /// <summary>Gets a value to be used for the DebuggerDisplayAttribute.  This must not throw even if HasValue is false.</summary>
446         private T ValueForDebugger { get { return _source.GetDebuggingInformation().Value; } }
447
448         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
449         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
450
451         /// <summary>The data to display in the debugger display attribute.</summary>
452         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
453         private object DebuggerDisplayContent
454         {
455             get
456             {
457                 return string.Format("{0}, HasValue={1}, Value={2}",
458                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
459                     HasValueForDebugger,
460                     ValueForDebugger);
461             }
462         }
463         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
464         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
465
466         /// <summary>Provides a debugger type proxy for the BroadcastBlock.</summary>
467         private sealed class DebugView
468         {
469             /// <summary>The BroadcastBlock being debugged.</summary>
470             private readonly BroadcastBlock<T> _broadcastBlock;
471             /// <summary>Debug info about the source side of the broadcast.</summary>
472             private readonly BroadcastingSourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
473
474             /// <summary>Initializes the debug view.</summary>
475             /// <param name="broadcastBlock">The BroadcastBlock being debugged.</param>
476             public DebugView(BroadcastBlock<T> broadcastBlock)
477             {
478                 Contract.Requires(broadcastBlock != null, "Need a block with which to construct the debug view.");
479                 _broadcastBlock = broadcastBlock;
480                 _sourceDebuggingInformation = broadcastBlock._source.GetDebuggingInformation();
481             }
482
483             /// <summary>Gets the messages waiting to be processed.</summary>
484             public IEnumerable<T> InputQueue { get { return _sourceDebuggingInformation.InputQueue; } }
485             /// <summary>Gets whether the broadcast has a current value.</summary>
486             public bool HasValue { get { return _broadcastBlock.HasValueForDebugger; } }
487             /// <summary>Gets the broadcast's current value.</summary>
488             public T Value { get { return _broadcastBlock.ValueForDebugger; } }
489
490             /// <summary>Gets the task being used for output processing.</summary>
491             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
492
493             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
494             public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
495             /// <summary>Gets whether the block is declining further messages.</summary>
496             public bool IsDecliningPermanently { get { return _broadcastBlock._decliningPermanently; } }
497             /// <summary>Gets whether the block is completed.</summary>
498             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
499             /// <summary>Gets the block's Id.</summary>
500             public int Id { get { return Common.GetBlockId(_broadcastBlock); } }
501
502             /// <summary>Gets the set of all targets linked from this block.</summary>
503             public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
504             /// <summary>Gets the set of all targets linked from this block.</summary>
505             public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
506         }
507
508         /// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
509         /// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
510         [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
511         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
512         private sealed class BroadcastingSourceCore<TOutput>
513         {
514             /// <summary>A registry used to store all linked targets and information about them.</summary>
515             private readonly TargetRegistry<TOutput> _targetRegistry;
516             /// <summary>All of the output messages queued up to be received by consumers/targets.</summary>
517             private readonly Queue<TOutput> _messages = new Queue<TOutput>();
518             /// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
519             private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
520             /// <summary>
521             /// An action to be invoked on the owner block when an item is removed.
522             /// This may be null if the owner block doesn't need to be notified.
523             /// </summary>
524             private readonly Action<int> _itemsRemovedAction;
525
526             /// <summary>Gets the object to use as the outgoing lock.</summary>
527             private object OutgoingLock { get { return _completionTask; } }
528             /// <summary>Gets the object to use as the value lock.</summary>
529             private object ValueLock { get { return _targetRegistry; } }
530
531             /// <summary>The source utilize this helper.</summary>
532             private readonly BroadcastBlock<TOutput> _owningSource;
533             /// <summary>The options used to configure this block's execution.</summary>
534             private readonly DataflowBlockOptions _dataflowBlockOptions;
535             /// <summary>The cloning function to use.</summary>
536             private readonly Func<TOutput, TOutput> _cloningFunction;
537
538             /// <summary>An indicator whether _currentMessage has a value.</summary>
539             private bool _currentMessageIsValid;
540             /// <summary>The message currently being broadcast.</summary>
541             private TOutput _currentMessage;
542             /// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
543             private ITargetBlock<TOutput> _nextMessageReservedFor;
544             /// <summary>Whether this block should again attempt to offer messages to targets.</summary>
545             private bool _enableOffering;
546             /// <summary>Whether all future messages should be declined.</summary>
547             private bool _decliningPermanently;
548             /// <summary>The task used to process the output and offer it to targets.</summary>
549             private Task _taskForOutputProcessing;
550             /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
551             private List<Exception> _exceptions;
552             /// <summary>Counter for message IDs unique within this source block.</summary>
553             private long _nextMessageId = 1; // We are going to use this value before incrementing.
554             /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
555             private bool _completionReserved;
556
557             /// <summary>Initializes the source core.</summary>
558             /// <param name="owningSource">The source utilizing this core.</param>
559             /// <param name="cloningFunction">The function to use to clone the data when offered to other blocks.  May be null.</param>
560             /// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
561             /// <param name="itemsRemovedAction">Action to invoke when an item is removed.</param>
562             internal BroadcastingSourceCore(
563                 BroadcastBlock<TOutput> owningSource,
564                 Func<TOutput, TOutput> cloningFunction,
565                 DataflowBlockOptions dataflowBlockOptions,
566                 Action<int> itemsRemovedAction)
567             {
568                 Contract.Requires(owningSource != null, "Must be associated with a broadcast block.");
569                 Contract.Requires(dataflowBlockOptions != null, "Options are required to configure this block.");
570
571                 // Store the arguments
572                 _owningSource = owningSource;
573                 _cloningFunction = cloningFunction;
574                 _dataflowBlockOptions = dataflowBlockOptions;
575                 _itemsRemovedAction = itemsRemovedAction;
576
577                 // Construct members that depend on the arguments
578                 _targetRegistry = new TargetRegistry<TOutput>(_owningSource);
579             }
580
581             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
582             internal Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
583             {
584                 // Take the lock only long enough to get the message,
585                 // synchronizing with other activities on the block.
586                 // We don't want to execute the user-provided cloning delegate
587                 // while holding the lock.
588                 TOutput message;
589                 bool isValid;
590                 lock (OutgoingLock)
591                 {
592                     lock (ValueLock)
593                     {
594                         message = _currentMessage;
595                         isValid = _currentMessageIsValid;
596                     }
597                 }
598
599                 // Clone and hand back a message if we have one and if it passes the filter.
600                 // (A null filter means all messages pass.)
601                 if (isValid && (filter == null || filter(message)))
602                 {
603                     item = CloneItem(message);
604                     return true;
605                 }
606                 else
607                 {
608                     item = default(TOutput);
609                     return false;
610                 }
611             }
612
613             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
614             internal Boolean TryReceiveAll(out IList<TOutput> items)
615             {
616                 // Try to receive the one item this block may have.
617                 // If we can, give back an array of one item. Otherwise, give back null.
618                 TOutput item;
619                 if (TryReceive(null, out item))
620                 {
621                     items = new TOutput[] { item };
622                     return true;
623                 }
624                 else
625                 {
626                     items = null;
627                     return false;
628                 }
629             }
630
631             /// <summary>Adds a message to the source block for propagation.</summary>
632             /// <param name="item">The item to be wrapped in a message to be added.</param>
633             internal void AddMessage(TOutput item)
634             {
635                 // This method must not take the outgoing lock, as it will be called in situations
636                 // where a derived type's incoming lock is held.  The lock leveling structure
637                 // we're employing is such that outgoing may be held while acquiring incoming, but
638                 // of course not the other way around.  This is the reason why DataflowSourceBlock
639                 // needs ValueLock as well.  Otherwise, it would be pure overhead.
640                 lock (ValueLock)
641                 {
642                     if (_decliningPermanently) return;
643                     _messages.Enqueue(item);
644                     if (_messages.Count == 1) _enableOffering = true;
645                     OfferAsyncIfNecessary();
646                 }
647             }
648
649             /// <summary>Informs the block that it will not be receiving additional messages.</summary>
650             internal void Complete()
651             {
652                 lock (ValueLock)
653                 {
654                     _decliningPermanently = true;
655
656                     // Complete may be called in a context where an incoming lock is held.  We need to 
657                     // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
658                     // However, now that _decliningPermanently has been set, the timing of
659                     // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
660                     // and take the necessary locks in a situation where we're sure it won't cause a problem.
661                     Task.Factory.StartNew(state =>
662                     {
663                         var thisSourceCore = (BroadcastingSourceCore<TOutput>)state;
664                         lock (thisSourceCore.OutgoingLock)
665                         {
666                             lock (thisSourceCore.ValueLock)
667                             {
668                                 thisSourceCore.CompleteBlockIfPossible();
669                             }
670                         }
671                     }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
672                 }
673             }
674
675             /// <summary>Clones the item.</summary>
676             /// <param name="item">The item to clone.</param>
677             /// <returns>The cloned item.</returns>
678             private TOutput CloneItem(TOutput item)
679             {
680                 return _cloningFunction != null ?
681                     _cloningFunction(item) :
682                     item;
683             }
684
685             /// <summary>Offers the current message to a specific target.</summary>
686             /// <param name="target">The target to which to offer the current message.</param>
687             private void OfferCurrentMessageToNewTarget(ITargetBlock<TOutput> target)
688             {
689                 Contract.Requires(target != null, "Target required to offer messages to.");
690                 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
691                 Common.ContractAssertMonitorStatus(ValueLock, held: false);
692
693                 // Get the current message if there is one
694                 TOutput currentMessage;
695                 bool isValid;
696                 lock (ValueLock)
697                 {
698                     currentMessage = _currentMessage;
699                     isValid = _currentMessageIsValid;
700                 }
701
702                 // If there is no valid message yet, there is nothing to offer
703                 if (!isValid) return;
704
705                 // Offer it to the target.
706                 // We must not increment the message ID here. We only do that when we populate _currentMessage, i.e. when we dequeue.
707                 bool useCloning = _cloningFunction != null;
708                 DataflowMessageStatus result = target.OfferMessage(new DataflowMessageHeader(_nextMessageId), currentMessage, _owningSource, consumeToAccept: useCloning);
709
710                 // If accepted and the target was linked as "unlinkAfterOne", remove it
711                 if (result == DataflowMessageStatus.Accepted)
712                 {
713                     if (!useCloning)
714                     {
715                         // If accepted and the target was linked as "once", mark it for removal.
716                         // If we were forcing consumption, this removal would have already
717                         // happened in ConsumeMessage.
718                         _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
719                     }
720                 }
721                 // If declined permanently, remove it
722                 else if (result == DataflowMessageStatus.DecliningPermanently)
723                 {
724                     _targetRegistry.Remove(target);
725                 }
726                 else Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages from a Broadcast should never be missed.");
727             }
728
729             /// <summary>Offers messages to targets.</summary>
730             private bool OfferToTargets()
731             {
732                 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
733                 Common.ContractAssertMonitorStatus(ValueLock, held: false);
734
735                 DataflowMessageHeader header = default(DataflowMessageHeader);
736                 TOutput message = default(TOutput);
737                 int numDequeuedMessages = 0;
738                 lock (ValueLock)
739                 {
740                     // If there's a reservation or there aren't any more messages,
741                     // there's nothing for us to do.  If there's no reservation
742                     // and a message is available, dequeue the next one and store it
743                     // as the new current.  If we're now at 0 message, disable further
744                     // propagation until more messages arrive.
745                     if (_nextMessageReservedFor == null && _messages.Count > 0)
746                     {
747                         // If there  are no targets registered, we might as well empty out the broadcast,
748                         // keeping just the last.  Otherwise, it'll happen anyway, but much more expensively.
749                         if (_targetRegistry.FirstTargetNode == null)
750                         {
751                             while (_messages.Count > 1)
752                             {
753                                 _messages.Dequeue();
754                                 numDequeuedMessages++;
755                             }
756                         }
757
758                         // Get the next message to offer
759                         Debug.Assert(_messages.Count > 0, "There must be at least one message to dequeue.");
760                         _currentMessage = message = _messages.Dequeue();
761                         numDequeuedMessages++;
762                         _currentMessageIsValid = true;
763                         header = new DataflowMessageHeader(++_nextMessageId);
764                         if (_messages.Count == 0) _enableOffering = false;
765                     }
766                     else
767                     {
768                         _enableOffering = false;
769                         return false;
770                     }
771                 } // must not hold ValueLock when calling out to targets
772
773                 // Offer the message
774                 if (header.IsValid)
775                 {
776                     // Notify the owner block that our count has decreased
777                     if (_itemsRemovedAction != null) _itemsRemovedAction(numDequeuedMessages);
778
779                     // Offer it to each target, unless a soleTarget was provided, which case just offer it to that one.
780                     TargetRegistry<TOutput>.LinkedTargetInfo cur = _targetRegistry.FirstTargetNode;
781                     while (cur != null)
782                     {
783                         // Note that during OfferMessage, a target may call ConsumeMessage, which may unlink the target
784                         // if the target is registered as "once".  Doing so will remove the target from the targets list.
785                         // As such, we avoid using an enumerator over _targetRegistry and instead walk from back to front,
786                         // so that if an element is removed, it won't affect the rest of our walk.
787                         TargetRegistry<TOutput>.LinkedTargetInfo next = cur.Next;
788                         ITargetBlock<TOutput> target = cur.Target;
789                         OfferMessageToTarget(header, message, target);
790                         cur = next;
791                     }
792                 }
793                 return true;
794             }
795
796             /// <summary>Offers the specified message to the specified target.</summary>
797             /// <param name="header">The header of the message to offer.</param>
798             /// <param name="message">The message to offer.</param>
799             /// <param name="target">The target to which the message should be offered.</param>
800             /// <remarks>
801             /// This will remove the target from the target registry if the result of the propagation demands it.
802             /// </remarks>
803             private void OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target)
804             {
805                 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
806                 Common.ContractAssertMonitorStatus(ValueLock, held: false);
807
808                 // Offer the message.  If there's a cloning function, we force the target to
809                 // come back to us to consume the message, allowing us the opportunity to run
810                 // the cloning function once we know they want the data.  If there is no cloning
811                 // function, there's no reason for them to call back here.
812                 bool useCloning = _cloningFunction != null;
813                 switch (target.OfferMessage(header, message, _owningSource, consumeToAccept: useCloning))
814                 {
815                     case DataflowMessageStatus.Accepted:
816                         if (!useCloning)
817                         {
818                             // If accepted and the target was linked as "once", mark it for removal.
819                             // If we were forcing consumption, this removal would have already
820                             // happened in ConsumeMessage.
821                             _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
822                         }
823                         break;
824
825                     case DataflowMessageStatus.DecliningPermanently:
826                         // If declined permanently, mark the target for removal
827                         _targetRegistry.Remove(target);
828                         break;
829
830                     case DataflowMessageStatus.NotAvailable:
831                         Debug.Assert(false, "Messages from a Broadcast should never be missed.");
832                         break;
833                         // No action required for Postponed or Declined
834                 }
835             }
836
837             /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
838             /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
839             private void OfferAsyncIfNecessary(bool isReplacementReplica = false)
840             {
841                 Common.ContractAssertMonitorStatus(ValueLock, held: true);
842                 // This method must not take the OutgoingLock.
843
844                 bool currentlyProcessing = _taskForOutputProcessing != null;
845                 bool processingToDo = _enableOffering && _messages.Count > 0;
846
847                 // If there's any work to be done...
848                 if (!currentlyProcessing && processingToDo && !CanceledOrFaulted)
849                 {
850                     // Create task and store into _taskForOutputProcessing prior to scheduling the task
851                     // so that _taskForOutputProcessing will be visibly set in the task loop.
852                     _taskForOutputProcessing = new Task(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore(), this,
853                                                         Common.GetCreationOptionsForTask(isReplacementReplica));
854
855 #if FEATURE_TRACING
856                     DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
857                     if (etwLog.IsEnabled())
858                     {
859                         etwLog.TaskLaunchedForMessageHandling(
860                             _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
861                     }
862 #endif
863
864                     // Start the task handling scheduling exceptions
865                     Exception exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
866                     if (exception != null)
867                     {
868                         // First, log the exception while the processing state is dirty which is preventing the block from completing.
869                         // Then revert the proactive processing state changes.
870                         // And last, try to complete the block.
871                         AddException(exception);
872                         _decliningPermanently = true;
873                         _taskForOutputProcessing = null;
874
875                         // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
876                         // Re-take the locks on a separate thread.
877                         Task.Factory.StartNew(state =>
878                         {
879                             var thisSourceCore = (BroadcastingSourceCore<TOutput>)state;
880                             lock (thisSourceCore.OutgoingLock)
881                             {
882                                 lock (thisSourceCore.ValueLock)
883                                 {
884                                     thisSourceCore.CompleteBlockIfPossible();
885                                 }
886                             }
887                         }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
888                     }
889                 }
890             }
891
892             /// <summary>Task body used to process messages.</summary>
893             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
894             private void OfferMessagesLoopCore()
895             {
896                 try
897                 {
898                     int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
899                     lock (OutgoingLock)
900                     {
901                         // Offer as many messages as we can
902                         for (int counter = 0;
903                             counter < maxMessagesPerTask && !CanceledOrFaulted;
904                             counter++)
905                         {
906                             if (!OfferToTargets()) break;
907                         }
908                     }
909                 }
910                 catch (Exception exception)
911                 {
912                     _owningSource.CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true);
913                 }
914                 finally
915                 {
916                     lock (OutgoingLock)
917                     {
918                         lock (ValueLock)
919                         {
920                             // We're no longer processing, so null out the processing task
921                             _taskForOutputProcessing = null;
922
923                             // However, we may have given up early because we hit our own configured
924                             // processing limits rather than because we ran out of work to do.  If that's
925                             // the case, make sure we spin up another task to keep going.
926                             OfferAsyncIfNecessary(isReplacementReplica: true);
927
928                             // If, however, we stopped because we ran out of work to do and we
929                             // know we'll never get more, then complete.
930                             CompleteBlockIfPossible();
931                         }
932                     }
933                 }
934             }
935
936             /// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
937             private void CompleteBlockIfPossible()
938             {
939                 Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
940                 Common.ContractAssertMonitorStatus(ValueLock, held: true);
941
942                 if (!_completionReserved)
943                 {
944                     bool currentlyProcessing = _taskForOutputProcessing != null;
945                     bool noMoreMessages = _decliningPermanently && _messages.Count == 0;
946
947                     // Are we done forever?
948                     bool complete = !currentlyProcessing && (noMoreMessages || CanceledOrFaulted);
949                     if (complete)
950                     {
951                         CompleteBlockIfPossible_Slow();
952                     }
953                 }
954             }
955
956             /// <summary>
957             /// Slow path for CompleteBlockIfPossible. 
958             /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
959             /// </summary>
960             private void CompleteBlockIfPossible_Slow()
961             {
962                 Contract.Requires(_taskForOutputProcessing == null, "There must be no processing tasks.");
963                 Contract.Requires(
964                     (_decliningPermanently && _messages.Count == 0) || CanceledOrFaulted,
965                     "There must be no more messages or the block must be canceled or faulted.");
966
967                 _completionReserved = true;
968
969                 // Run asynchronously to get out of the currently held locks
970                 Task.Factory.StartNew(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore).CompleteBlockOncePossible(),
971                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
972             }
973
974             /// <summary>
975             /// Completes the block.  This must only be called once, and only once all of the completion conditions are met.
976             /// As such, it must only be called from CompleteBlockIfPossible.
977             /// </summary>
978             private void CompleteBlockOncePossible()
979             {
980                 TargetRegistry<TOutput>.LinkedTargetInfo linkedTargets;
981                 List<Exception> exceptions;
982
983                 // Clear out the target registry and buffers to help avoid memory leaks.
984                 // We do not clear _currentMessage, which should remain as that message forever.
985                 lock (OutgoingLock)
986                 {
987                     // Save the linked list of targets so that it could be traversed later to propagate completion
988                     linkedTargets = _targetRegistry.ClearEntryPoints();
989                     lock (ValueLock)
990                     {
991                         _messages.Clear();
992
993                         // Save a local reference to the exceptions list and null out the field,
994                         // so that if the target side tries to add an exception this late,
995                         // it will go to a separate list (that will be ignored.)
996                         exceptions = _exceptions;
997                         _exceptions = null;
998                     }
999                 }
1000
1001                 // If it's due to an exception, finish in a faulted state
1002                 if (exceptions != null)
1003                 {
1004                     _completionTask.TrySetException(exceptions);
1005                 }
1006                 // It's due to cancellation, finish in a canceled state
1007                 else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
1008                 {
1009                     _completionTask.TrySetCanceled();
1010                 }
1011                 // Otherwise, finish in a successful state.
1012                 else
1013                 {
1014                     _completionTask.TrySetResult(default(VoidResult));
1015                 }
1016
1017                 // Now that the completion task is completed, we may propagate completion to the linked targets
1018                 _targetRegistry.PropagateCompletion(linkedTargets);
1019 #if FEATURE_TRACING
1020                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
1021                 if (etwLog.IsEnabled())
1022                 {
1023                     etwLog.DataflowBlockCompleted(_owningSource);
1024                 }
1025 #endif
1026             }
1027
1028             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
1029             [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
1030             internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
1031             {
1032                 // Validate arguments
1033                 if (target == null) throw new ArgumentNullException("target");
1034                 if (linkOptions == null) throw new ArgumentNullException("linkOptions");
1035                 Contract.EndContractBlock();
1036
1037                 lock (OutgoingLock)
1038                 {
1039                     // If we've completed or completion has at least started, offer the message to this target,
1040                     // and propagate completion if that was requested.
1041                     // Then there's nothing more to be done.
1042                     if (_completionReserved)
1043                     {
1044                         OfferCurrentMessageToNewTarget(target);
1045                         if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
1046                         return Disposables.Nop;
1047                     }
1048
1049                     // Otherwise, add the target and then offer it the current
1050                     // message.  We do this in this order because offering may
1051                     // cause the target to be removed if it's unlinkAfterOne,
1052                     // and in the reverse order we would end up adding the target
1053                     // after it was "removed".
1054                     _targetRegistry.Add(ref target, linkOptions);
1055                     OfferCurrentMessageToNewTarget(target);
1056                     return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
1057                 }
1058             }
1059
1060             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
1061             internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
1062             {
1063                 // Validate arguments
1064                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1065                 if (target == null) throw new ArgumentNullException("target");
1066                 Contract.EndContractBlock();
1067
1068                 TOutput valueToClone;
1069                 lock (OutgoingLock) // We may currently be calling out under this lock to the target; requires it to be reentrant
1070                 {
1071                     lock (ValueLock)
1072                     {
1073                         // If this isn't the next message to be served up, bail
1074                         if (messageHeader.Id != _nextMessageId)
1075                         {
1076                             messageConsumed = false;
1077                             return default(TOutput);
1078                         }
1079
1080                         // If the caller has the reservation, release the reservation.
1081                         // We still allow others to take the message if there's a reservation.
1082                         if (_nextMessageReservedFor == target)
1083                         {
1084                             _nextMessageReservedFor = null;
1085                             _enableOffering = true;
1086                         }
1087                         _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
1088
1089                         OfferAsyncIfNecessary();
1090                         CompleteBlockIfPossible();
1091
1092                         // Return a clone of the consumed message.
1093                         valueToClone = _currentMessage;
1094                     }
1095                 }
1096
1097                 messageConsumed = true;
1098                 return CloneItem(valueToClone);
1099             }
1100
1101             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
1102             internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1103             {
1104                 // Validate arguments
1105                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1106                 if (target == null) throw new ArgumentNullException("target");
1107                 Contract.EndContractBlock();
1108
1109                 lock (OutgoingLock)
1110                 {
1111                     // If no one currently holds a reservation...
1112                     if (_nextMessageReservedFor == null)
1113                     {
1114                         lock (ValueLock)
1115                         {
1116                             // ...and the requested message is next in line, allow it
1117                             if (messageHeader.Id == _nextMessageId)
1118                             {
1119                                 _nextMessageReservedFor = target;
1120                                 _enableOffering = false;
1121                                 return true;
1122                             }
1123                         }
1124                     }
1125                 }
1126                 return false;
1127             }
1128
1129             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
1130             internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1131             {
1132                 // Validate arguments
1133                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1134                 if (target == null) throw new ArgumentNullException("target");
1135                 Contract.EndContractBlock();
1136
1137                 lock (OutgoingLock)
1138                 {
1139                     // If someone else holds the reservation, bail.
1140                     if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
1141
1142                     TOutput messageToReoffer;
1143                     lock (ValueLock)
1144                     {
1145                         // If this is not the message at the head of the queue, bail
1146                         if (messageHeader.Id != _nextMessageId) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
1147
1148                         // Otherwise, release the reservation, and reoffer the message to all targets.
1149                         _nextMessageReservedFor = null;
1150                         _enableOffering = true;
1151                         messageToReoffer = _currentMessage;
1152                         OfferAsyncIfNecessary();
1153                     }
1154
1155                     // We need to explicitly reoffer this message to the releaser,
1156                     // as otherwise if the target has join behavior it could end up waiting for an offer from
1157                     // this broadcast forever, even though data is in fact available.  We could only
1158                     // do this if _messages.Count == 0, as if it's > 0 the message will get overwritten
1159                     // as part of the asynchronous offering, but for consistency we should always reoffer
1160                     // the current message.
1161                     OfferMessageToTarget(messageHeader, messageToReoffer, target);
1162                 }
1163             }
1164
1165             /// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
1166             private bool CanceledOrFaulted
1167             {
1168                 get
1169                 {
1170                     // Cancellation is honored as soon as the CancellationToken has been signaled.
1171                     // Faulting is honored after an exception has been encountered and the owning block
1172                     // has invoked Complete on us.
1173                     return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
1174                         (Volatile.Read(ref _exceptions) != null && _decliningPermanently);
1175                 }
1176             }
1177
1178             /// <summary>Adds an individual exceptionto this source.</summary>
1179             /// <param name="exception">The exception to add</param>
1180             internal void AddException(Exception exception)
1181             {
1182                 Contract.Requires(exception != null, "An exception to add is required.");
1183                 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
1184                 lock (ValueLock)
1185                 {
1186                     Common.AddException(ref _exceptions, exception);
1187                 }
1188             }
1189
1190             /// <summary>Adds exceptions to this source.</summary>
1191             /// <param name="exceptions">The exceptions to add</param>
1192             internal void AddExceptions(List<Exception> exceptions)
1193             {
1194                 Contract.Requires(exceptions != null, "A list of exceptions to add is required.");
1195                 Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
1196                 lock (ValueLock)
1197                 {
1198                     foreach (Exception exception in exceptions)
1199                     {
1200                         Common.AddException(ref _exceptions, exception);
1201                     }
1202                 }
1203             }
1204
1205             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1206             internal Task Completion { get { return _completionTask.Task; } }
1207
1208             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1209             internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
1210
1211             /// <summary>Gets the object to display in the debugger display attribute.</summary>
1212             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1213             private object DebuggerDisplayContent
1214             {
1215                 get
1216                 {
1217                     var displaySource = _owningSource as IDebuggerDisplay;
1218                     return string.Format("Block=\"{0}\"",
1219                         displaySource != null ? displaySource.Content : _owningSource);
1220                 }
1221             }
1222
1223             /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
1224             /// <returns>Debugging information about this source core.</returns>
1225             internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
1226
1227             /// <summary>Provides debugging information about the source core.</summary>
1228             internal sealed class DebuggingInformation
1229             {
1230                 /// <summary>The source being viewed.</summary>
1231                 private BroadcastingSourceCore<TOutput> _source;
1232
1233                 /// <summary>Initializes the type proxy.</summary>
1234                 /// <param name="source">The source being viewed.</param>
1235                 public DebuggingInformation(BroadcastingSourceCore<TOutput> source) { _source = source; }
1236
1237                 /// <summary>Gets whether the source contains a current message.</summary>
1238                 public bool HasValue { get { return _source._currentMessageIsValid; } }
1239                 /// <summary>Gets the value of the source's current message.</summary>
1240                 public TOutput Value { get { return _source._currentMessage; } }
1241                 /// <summary>Gets the number of messages waiting to be made current.</summary>
1242                 public int InputCount { get { return _source._messages.Count; } }
1243                 /// <summary>Gets the messages available for receiving.</summary>
1244                 public IEnumerable<TOutput> InputQueue { get { return _source._messages.ToList(); } }
1245                 /// <summary>Gets the task being used for output processing.</summary>
1246                 public Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }
1247
1248                 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
1249                 public DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }
1250                 /// <summary>Gets whether the block is declining further messages.</summary>
1251                 public bool IsDecliningPermanently { get { return _source._decliningPermanently; } }
1252                 /// <summary>Gets whether the block is completed.</summary>
1253                 public bool IsCompleted { get { return _source.Completion.IsCompleted; } }
1254
1255                 /// <summary>Gets the set of all targets linked from this block.</summary>
1256                 public TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }
1257                 /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
1258                 public ITargetBlock<TOutput> NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }
1259             }
1260         }
1261     }
1262 }