[System.Threading.Tasks.Dataflow] Replace implementation with CoreFx version
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Base / DataflowBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // DataflowBlock.cs
7 //
8 //
9 // Common functionality for ITargetBlock, ISourceBlock, and IPropagatorBlock.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Linq;
18 using System.Runtime.CompilerServices;
19 using System.Runtime.ExceptionServices;
20 using System.Security;
21 using System.Threading.Tasks.Dataflow.Internal;
22 using System.Threading.Tasks.Dataflow.Internal.Threading;
23
24 namespace System.Threading.Tasks.Dataflow
25 {
26     /// <summary>
27     /// Provides a set of static (Shared in Visual Basic) methods for working with dataflow blocks.
28     /// </summary>
29     public static class DataflowBlock
30     {
31         #region LinkTo
32         /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/>.</summary>
33         /// <param name="source">The source from which to link.</param>
34         /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
35         /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
36         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
37         /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
38         public static IDisposable LinkTo<TOutput>(
39             this ISourceBlock<TOutput> source,
40             ITargetBlock<TOutput> target)
41         {
42             // Validate arguments
43             if (source == null) throw new ArgumentNullException("source");
44             if (target == null) throw new ArgumentNullException("target");
45             Contract.EndContractBlock();
46
47             // This method exists purely to pass default DataflowLinkOptions 
48             // to increase usability of the "90%" case.
49             return source.LinkTo(target, DataflowLinkOptions.Default);
50         }
51
52         /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/> using the specified filter.</summary>
53         /// <param name="source">The source from which to link.</param>
54         /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
55         /// <param name="predicate">The filter a message must pass in order for it to propagate from the source to the target.</param>
56         /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
57         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
58         /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
59         /// <exception cref="System.ArgumentNullException">The <paramref name="predicate"/> is null (Nothing in Visual Basic).</exception>
60         public static IDisposable LinkTo<TOutput>(
61             this ISourceBlock<TOutput> source,
62             ITargetBlock<TOutput> target,
63             Predicate<TOutput> predicate)
64         {
65             // All argument validation handled by delegated method.
66             return LinkTo(source, target, DataflowLinkOptions.Default, predicate);
67         }
68
69         /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/> using the specified filter.</summary>
70         /// <param name="source">The source from which to link.</param>
71         /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
72         /// <param name="predicate">The filter a message must pass in order for it to propagate from the source to the target.</param>
73         /// <param name="linkOptions">The options to use to configure the link.</param>
74         /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
75         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
76         /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
77         /// <exception cref="System.ArgumentNullException">The <paramref name="linkOptions"/> is null (Nothing in Visual Basic).</exception>
78         /// <exception cref="System.ArgumentNullException">The <paramref name="predicate"/> is null (Nothing in Visual Basic).</exception>
79         public static IDisposable LinkTo<TOutput>(
80             this ISourceBlock<TOutput> source,
81             ITargetBlock<TOutput> target,
82             DataflowLinkOptions linkOptions,
83             Predicate<TOutput> predicate)
84         {
85             // Validate arguments
86             if (source == null) throw new ArgumentNullException("source");
87             if (target == null) throw new ArgumentNullException("target");
88             if (linkOptions == null) throw new ArgumentNullException("linkOptions");
89             if (predicate == null) throw new ArgumentNullException("predicate");
90             Contract.EndContractBlock();
91
92             // Create the filter, which links to the real target, and then
93             // link the real source to this intermediate filter.
94             var filter = new FilteredLinkPropagator<TOutput>(source, target, predicate);
95             return source.LinkTo(filter, linkOptions);
96         }
97
98         /// <summary>Provides a synchronous filter for use in filtered LinkTos.</summary>
99         /// <typeparam name="T">Specifies the type of data being filtered.</typeparam>
100         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
101         [DebuggerTypeProxy(typeof(FilteredLinkPropagator<>.DebugView))]
102         private sealed class FilteredLinkPropagator<T> : IPropagatorBlock<T, T>, IDebuggerDisplay
103         {
104             /// <summary>The source connected with this filter.</summary>
105             private readonly ISourceBlock<T> _source;
106             /// <summary>The target with which this block is associated.</summary>
107             private readonly ITargetBlock<T> _target;
108             /// <summary>The predicate provided by the user.</summary>
109             private readonly Predicate<T> _userProvidedPredicate;
110
111             /// <summary>Initializes the filter passthrough.</summary>
112             /// <param name="source">The source connected to this filter.</param>
113             /// <param name="target">The target to which filtered messages should be passed.</param>
114             /// <param name="predicate">The predicate to run for each messsage.</param>
115             internal FilteredLinkPropagator(ISourceBlock<T> source, ITargetBlock<T> target, Predicate<T> predicate)
116             {
117                 Contract.Requires(source != null, "Filtered link requires a source to filter on.");
118                 Contract.Requires(target != null, "Filtered link requires a target to filter to.");
119                 Contract.Requires(predicate != null, "Filtered link requires a predicate to filter with.");
120
121                 // Store the arguments
122                 _source = source;
123                 _target = target;
124                 _userProvidedPredicate = predicate;
125             }
126
127             /// <summary>Runs the user-provided predicate over an item in the correct execution context.</summary>
128             /// <param name="item">The item to evaluate.</param>
129             /// <returns>true if the item passed the filter; otherwise, false.</returns>
130             private bool RunPredicate(T item)
131             {
132                 Contract.Requires(_userProvidedPredicate != null, "User-provided predicate is required.");
133
134                 return _userProvidedPredicate(item); // avoid state object allocation if execution context isn't needed
135             }
136
137             /// <summary>Manually closes over state necessary in FilteredLinkPropagator.</summary>
138             private sealed class PredicateContextState
139             {
140                 /// <summary>The input to be filtered.</summary>
141                 internal readonly T Input;
142                 /// <summary>The predicate function.</summary>
143                 internal readonly Predicate<T> Predicate;
144                 /// <summary>The result of the filtering operation.</summary>
145                 internal bool Output;
146
147                 /// <summary>Initializes the predicate state.</summary>
148                 /// <param name="input">The input to be filtered.</param>
149                 /// <param name="predicate">The predicate function.</param>
150                 internal PredicateContextState(T input, Predicate<T> predicate)
151                 {
152                     Contract.Requires(predicate != null, "A predicate with which to filter is required.");
153                     this.Input = input;
154                     this.Predicate = predicate;
155                 }
156
157                 /// <summary>Runs the predicate function over the input and stores the result into the output.</summary>
158                 internal void Run()
159                 {
160                     Contract.Requires(Predicate != null, "Non-null predicate required");
161                     Output = Predicate(Input);
162                 }
163             }
164
165             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
166             DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
167             {
168                 // Validate arguments.  Some targets may have a null source, but FilteredLinkPropagator
169                 // is an internal target that should only ever have source non-null.
170                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
171                 if (source == null) throw new ArgumentNullException("source");
172                 Contract.EndContractBlock();
173
174                 // Run the filter.
175                 bool passedFilter = RunPredicate(messageValue);
176
177                 // If the predicate matched, pass the message along to the real target.
178                 if (passedFilter)
179                 {
180                     return _target.OfferMessage(messageHeader, messageValue, this, consumeToAccept);
181                 }
182                 // Otherwise, decline.
183                 else return DataflowMessageStatus.Declined;
184             }
185
186             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
187             T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
188             {
189                 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
190                 // The real source will also be doing verifications, so we don't need to validate args here.
191                 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
192                 return _source.ConsumeMessage(messageHeader, this, out messageConsumed);
193             }
194
195             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
196             bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
197             {
198                 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
199                 // The real source will also be doing verifications, so we don't need to validate args here.
200                 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
201                 return _source.ReserveMessage(messageHeader, this);
202             }
203
204             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
205             void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
206             {
207                 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
208                 // The real source will also be doing verifications, so we don't need to validate args here.
209                 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
210                 _source.ReleaseReservation(messageHeader, this);
211             }
212
213             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
214             Task IDataflowBlock.Completion { get { return _source.Completion; } }
215             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
216             void IDataflowBlock.Complete() { _target.Complete(); }
217             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
218             void IDataflowBlock.Fault(Exception exception) { _target.Fault(exception); }
219
220             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
221             IDisposable ISourceBlock<T>.LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
222
223             /// <summary>The data to display in the debugger display attribute.</summary>
224             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
225             private object DebuggerDisplayContent
226             {
227                 get
228                 {
229                     var displaySource = _source as IDebuggerDisplay;
230                     var displayTarget = _target as IDebuggerDisplay;
231                     return string.Format("{0} Source=\"{1}\", Target=\"{2}\"",
232                         Common.GetNameForDebugger(this),
233                         displaySource != null ? displaySource.Content : _source,
234                         displayTarget != null ? displayTarget.Content : _target);
235                 }
236             }
237             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
238             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
239
240             /// <summary>Provides a debugger type proxy for a filter.</summary>
241             private sealed class DebugView
242             {
243                 /// <summary>The filter.</summary>
244                 private readonly FilteredLinkPropagator<T> _filter;
245
246                 /// <summary>Initializes the debug view.</summary>
247                 /// <param name="filter">The filter to view.</param>
248                 public DebugView(FilteredLinkPropagator<T> filter)
249                 {
250                     Contract.Requires(filter != null, "Need a filter with which to construct the debug view.");
251                     _filter = filter;
252                 }
253
254                 /// <summary>The linked target for this filter.</summary>
255                 public ITargetBlock<T> LinkedTarget { get { return _filter._target; } }
256             }
257         }
258         #endregion
259
260         #region Post and SendAsync
261         /// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
262         /// <typeparam name="TInput">Specifies the type of data accepted by the target block.</typeparam>
263         /// <param name="target">The target block.</param>
264         /// <param name="item">The item being offered to the target.</param>
265         /// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
266         /// <remarks>
267         /// This method will return once the target block has decided to accept or decline the item,
268         /// but unless otherwise dictated by special semantics of the target block, it does not wait
269         /// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
270         /// will return from Post as soon as it has stored the posted item into its input queue).  From the perspective
271         /// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages, 
272         /// or for blocks that may do more processing in their Post implementation, consider using
273         ///  <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>, 
274         /// which will return immediately and will enable the target to postpone the posted message and later consume it 
275         /// after SendAsync returns.
276         /// </remarks>
277         public static Boolean Post<TInput>(this ITargetBlock<TInput> target, TInput item)
278         {
279             if (target == null) throw new ArgumentNullException("target");
280             return target.OfferMessage(Common.SingleMessageHeader, item, source: null, consumeToAccept: false) == DataflowMessageStatus.Accepted;
281         }
282
283         /// <summary>Asynchronously offers a message to the target message block, allowing for postponement.</summary>
284         /// <typeparam name="TInput">Specifies the type of the data to post to the target.</typeparam>
285         /// <param name="target">The target to which to post the data.</param>
286         /// <param name="item">The item being offered to the target.</param>
287         /// <returns>
288         /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that represents the asynchronous send.  If the target
289         /// accepts and consumes the offered element during the call to SendAsync, upon return
290         /// from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> 
291         /// property will return true.  If the target declines the offered element during the call, upon return from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will
292         /// be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> property will return false. If the target
293         /// postpones the offered element, the element will be buffered until such time that the target consumes or releases it, at which
294         /// point the Task will complete, with its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> indicating whether the message was consumed.  If the target
295         /// never attempts to consume or release the message, the returned task will never complete.
296         /// </returns>
297         /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
298         public static Task<Boolean> SendAsync<TInput>(this ITargetBlock<TInput> target, TInput item)
299         {
300             return SendAsync<TInput>(target, item, CancellationToken.None);
301         }
302
303         /// <summary>Asynchronously offers a message to the target message block, allowing for postponement.</summary>
304         /// <typeparam name="TInput">Specifies the type of the data to post to the target.</typeparam>
305         /// <param name="target">The target to which to post the data.</param>
306         /// <param name="item">The item being offered to the target.</param>
307         /// <param name="cancellationToken">The cancellation token with which to request cancellation of the send operation.</param>
308         /// <returns>
309         /// <para>
310         /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that represents the asynchronous send.  If the target
311         /// accepts and consumes the offered element during the call to SendAsync, upon return
312         /// from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> 
313         /// property will return true.  If the target declines the offered element during the call, upon return from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will
314         /// be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> property will return false. If the target
315         /// postpones the offered element, the element will be buffered until such time that the target consumes or releases it, at which
316         /// point the Task will complete, with its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> indicating whether the message was consumed.  If the target
317         /// never attempts to consume or release the message, the returned task will never complete.
318         /// </para>
319         /// <para>
320         /// If cancellation is requested before the target has successfully consumed the sent data, 
321         /// the returned task will complete in the Canceled state and the data will no longer be available to the target.
322         /// </para>
323         /// </returns>
324         /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
325         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
326         public static Task<Boolean> SendAsync<TInput>(this ITargetBlock<TInput> target, TInput item, CancellationToken cancellationToken)
327         {
328             // Validate arguments.  No validation necessary for item.
329             if (target == null) throw new ArgumentNullException("target");
330             Contract.EndContractBlock();
331
332             // Fast path check for cancellation
333             if (cancellationToken.IsCancellationRequested)
334                 return Common.CreateTaskFromCancellation<Boolean>(cancellationToken);
335
336             SendAsyncSource<TInput> source;
337
338             // Fast path: try to offer the item synchronously.  This first try is done
339             // without any form of cancellation, and thus consumeToAccept can be the better-performing "false".
340             try
341             {
342                 switch (target.OfferMessage(Common.SingleMessageHeader, item, source: null, consumeToAccept: false))
343                 {
344                     // If the message is immediately accepted, return a cached completed task with a true result
345                     case DataflowMessageStatus.Accepted:
346                         return Common.CompletedTaskWithTrueResult;
347
348                     // If the target is declining permanently, return a cached completed task with a false result
349                     case DataflowMessageStatus.DecliningPermanently:
350                         return Common.CompletedTaskWithFalseResult;
351
352 #if DEBUG
353                     case DataflowMessageStatus.Postponed:
354                         Debug.Assert(false, "A message should never be postponed when no source has been provided");
355                         break;
356
357                     case DataflowMessageStatus.NotAvailable:
358                         Debug.Assert(false, "The message should never be missed, as it's offered to only this one target");
359                         break;
360 #endif
361                 }
362
363                 // Slow path: the target did not accept the synchronous post, nor did it decline it.
364                 // Create a source for the send, launch the offering, and return the representative task.
365                 // This ctor attempts to register a cancellation notification which would throw if the
366                 // underlying CTS has been disposed of. Therefore, keep it inside the try/catch block.
367                 source = new SendAsyncSource<TInput>(target, item, cancellationToken);
368             }
369             catch (Exception exc)
370             {
371                 // If the target throws from OfferMessage, return a faulted task
372                 Common.StoreDataflowMessageValueIntoExceptionData(exc, item);
373                 return Common.CreateTaskFromException<Boolean>(exc);
374             }
375
376             Debug.Assert(source != null, "The SendAsyncSource instance must have been constructed.");
377             source.OfferToTarget(); // synchronous to preserve message ordering
378             return source.Task;
379         }
380
381         /// <summary>
382         /// Provides a source used by SendAsync that will buffer a single message and signal when it's been accepted or declined.
383         /// </summary>
384         /// <remarks>This source must only be passed to a single target, and must only be used once.</remarks>
385         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
386         [DebuggerTypeProxy(typeof(SendAsyncSource<>.DebugView))]
387         private sealed class SendAsyncSource<TOutput> : TaskCompletionSource<Boolean>, ISourceBlock<TOutput>, IDebuggerDisplay
388         {
389             /// <summary>The target to offer to.</summary>
390             private readonly ITargetBlock<TOutput> _target;
391             /// <summary>The buffered message.</summary>
392             private readonly TOutput _messageValue;
393
394             /// <summary>CancellationToken used to cancel the send.</summary>
395             private CancellationToken _cancellationToken;
396             /// <summary>Registration with the cancellation token.</summary>
397             private CancellationTokenRegistration _cancellationRegistration;
398             /// <summary>The cancellation/completion state of the source.</summary>
399             private int _cancellationState; // one of the CANCELLATION_STATE_* constant values, defaulting to NONE
400
401             // Cancellation states:
402             // _cancellationState starts out as NONE, and will remain that way unless a CancellationToken
403             // is provided in the initial OfferToTarget call.  As such, unless a token is provided,
404             // all synchronization related to cancellation will be avoided.  Once a token is provided,
405             // the state transitions to REGISTERED.  If cancellation then is requested or if the target
406             // calls back to consume the message, the state will transition to COMPLETING prior to 
407             // actually committing the action; if it can't transition to COMPLETING, then the action doesn't
408             // take effect (e.g. if cancellation raced with the target consuming, such that the cancellation
409             // action was able to transition to COMPLETING but the consumption wasn't, then ConsumeMessage
410             // would return false indicating that the message could not be consumed).  The only additional
411             // complication here is around reservations.  If a target reserves a message, _cancellationState
412             // transitions to RESERVED.  A subsequent ConsumeMessage call can successfully transition from
413             // RESERVED to COMPLETING, but cancellation can't; cancellation can only transition from REGISTERED
414             // to COMPLETING.  If the reservation on the message is instead released, _cancellationState
415             // will transition back to REGISTERED.
416
417             /// <summary>No cancellation registration is used.</summary>
418             private const int CANCELLATION_STATE_NONE = 0;
419             /// <summary>A cancellation token has been registered.</summary>
420             private const int CANCELLATION_STATE_REGISTERED = 1;
421             /// <summary>The message has been reserved. Only used if a cancellation token is in play.</summary>
422             private const int CANCELLATION_STATE_RESERVED = 2;
423             /// <summary>Completion is now in progress. Only used if a cancellation token is in play.</summary>
424             private const int CANCELLATION_STATE_COMPLETING = 3;
425
426             /// <summary>Initializes the source.</summary>
427             /// <param name="target">The target to offer to.</param>
428             /// <param name="messageValue">The message to offer and buffer.</param>
429             /// <param name="cancellationToken">The cancellation token with which to cancel the send.</param>
430             internal SendAsyncSource(ITargetBlock<TOutput> target, TOutput messageValue, CancellationToken cancellationToken)
431             {
432                 Contract.Requires(target != null, "A valid target to send to is required.");
433                 _target = target;
434                 _messageValue = messageValue;
435
436                 // If a cancelable CancellationToken is used, update our cancellation state
437                 // and register with the token.  Only if CanBeCanceled is true due we want
438                 // to pay the subsequent costs around synchronization between cancellation
439                 // requests and the target coming back to consume the message.
440                 if (cancellationToken.CanBeCanceled)
441                 {
442                     _cancellationToken = cancellationToken;
443                     _cancellationState = CANCELLATION_STATE_REGISTERED;
444
445                     try
446                     {
447                         _cancellationRegistration = cancellationToken.Register(
448                             _cancellationCallback, new WeakReference<SendAsyncSource<TOutput>>(this));
449                     }
450                     catch
451                     {
452                         // Suppress finalization.  Finalization is only required if the target drops a reference
453                         // to the source before the source has completed, and we'll never offer to the target.
454                         GC.SuppressFinalize(this);
455
456                         // Propagate the exception
457                         throw;
458                     }
459                 }
460             }
461
462             /// <summary>Finalizer that completes the returned task if all references to this source are dropped.</summary>
463             ~SendAsyncSource()
464             {
465                 // CompleteAsDeclined uses synchronization, which is dangerous for a finalizer 
466                 // during shutdown or appdomain unload.
467                 if (!Environment.HasShutdownStarted)
468                 {
469                     CompleteAsDeclined(runAsync: true);
470                 }
471             }
472
473             /// <summary>Completes the source in an "Accepted" state.</summary>
474             /// <param name="runAsync">true to accept asynchronously; false to accept synchronously.</param>
475             private void CompleteAsAccepted(bool runAsync)
476             {
477                 RunCompletionAction(state =>
478                 {
479                     try { ((SendAsyncSource<TOutput>)state).TrySetResult(true); }
480                     catch (ObjectDisposedException) { }
481                 }, this, runAsync);
482             }
483
484             /// <summary>Completes the source in an "Declined" state.</summary>
485             /// <param name="runAsync">true to decline asynchronously; false to decline synchronously.</param>
486             private void CompleteAsDeclined(bool runAsync)
487             {
488                 RunCompletionAction(state =>
489                 {
490                     // The try/catch for ObjectDisposedException handles the case where the 
491                     // user disposes of the returned task before we're done with it.
492                     try { ((SendAsyncSource<TOutput>)state).TrySetResult(false); }
493                     catch (ObjectDisposedException) { }
494                 }, this, runAsync);
495             }
496
497             /// <summary>Completes the source in faulted state.</summary>
498             /// <param name="exception">The exception with which to fault.</param>
499             /// <param name="runAsync">true to fault asynchronously; false to fault synchronously.</param>
500             private void CompleteAsFaulted(Exception exception, bool runAsync)
501             {
502                 RunCompletionAction(state =>
503                 {
504                     var tuple = (Tuple<SendAsyncSource<TOutput>, Exception>)state;
505                     try { tuple.Item1.TrySetException(tuple.Item2); }
506                     catch (ObjectDisposedException) { }
507                 }, Tuple.Create<SendAsyncSource<TOutput>, Exception>(this, exception), runAsync);
508             }
509
510             /// <summary>Completes the source in canceled state.</summary>
511             /// <param name="runAsync">true to fault asynchronously; false to fault synchronously.</param>
512             private void CompleteAsCanceled(bool runAsync)
513             {
514                 RunCompletionAction(state =>
515                 {
516                     try { ((SendAsyncSource<TOutput>)state).TrySetCanceled(); }
517                     catch (ObjectDisposedException) { }
518                 }, this, runAsync);
519             }
520
521             /// <summary>Executes a completion action.</summary>
522             /// <param name="completionAction">The action to execute, passed the state.</param>
523             /// <param name="completionActionState">The state to pass into the delegate.</param>
524             /// <param name="runAsync">true to execute the action asynchronously; false to execute it synchronously.</param>
525             /// <remarks>
526             /// async should be true if this is being called on a path that has the target on the stack, e.g.
527             /// the target is calling to ConsumeMessage.  We don't want to block the target indefinitely
528             /// with any synchronous continuations off of the returned send async task.
529             /// </remarks>
530             [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly")]
531             private void RunCompletionAction(Action<object> completionAction, object completionActionState, bool runAsync)
532             {
533                 Contract.Requires(completionAction != null, "Completion action to run is required.");
534
535                 // Suppress finalization.  Finalization is only required if the target drops a reference
536                 // to the source before the source has completed, and here we're completing the source.
537                 GC.SuppressFinalize(this);
538
539                 // Dispose of the cancellation registration if there is one
540                 if (_cancellationState != CANCELLATION_STATE_NONE)
541                 {
542                     Debug.Assert(_cancellationRegistration != default(CancellationTokenRegistration),
543                         "If we're not in NONE, we must have a cancellation token we've registered with.");
544                     _cancellationRegistration.Dispose();
545                 }
546
547                 // If we're meant to run asynchronously, launch a task.
548                 if (runAsync)
549                 {
550                     System.Threading.Tasks.Task.Factory.StartNew(
551                         completionAction, completionActionState,
552                         CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
553                 }
554                 // Otherwise, execute directly.
555                 else
556                 {
557                     completionAction(completionActionState);
558                 }
559             }
560
561             /// <summary>Offers the message to the target asynchronously.</summary>
562             private void OfferToTargetAsync()
563             {
564                 System.Threading.Tasks.Task.Factory.StartNew(
565                     state => ((SendAsyncSource<TOutput>)state).OfferToTarget(), this,
566                     CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
567             }
568
569             /// <summary>Cached delegate used to cancel a send in response to a cancellation request.</summary>
570             private readonly static Action<object> _cancellationCallback = CancellationHandler;
571
572             /// <summary>Attempts to cancel the source passed as state in response to a cancellation request.</summary>
573             /// <param name="state">
574             /// A weak reference to the SendAsyncSource.  A weak reference is used to prevent the source
575             /// from being rooted in a long-lived token.
576             /// </param>
577             private static void CancellationHandler(object state)
578             {
579                 SendAsyncSource<TOutput> source = Common.UnwrapWeakReference<SendAsyncSource<TOutput>>(state);
580                 if (source != null)
581                 {
582                     Debug.Assert(source._cancellationState != CANCELLATION_STATE_NONE,
583                         "If cancellation is in play, we must have already moved out of the NONE state.");
584
585                     // Try to reserve completion, and if we can, complete as canceled.  Note that we can only
586                     // achieve cancellation when in the REGISTERED state, and not when in the RESERVED state, 
587                     // as if a target has reserved the message, we must allow the message to be consumed successfully.
588                     if (source._cancellationState == CANCELLATION_STATE_REGISTERED && // fast check to avoid the interlocked if we can
589                         Interlocked.CompareExchange(ref source._cancellationState, CANCELLATION_STATE_COMPLETING, CANCELLATION_STATE_REGISTERED) == CANCELLATION_STATE_REGISTERED)
590                     {
591                         // We've reserved completion, so proceed to cancel the task.
592                         source.CompleteAsCanceled(true);
593                     }
594                 }
595             }
596
597             /// <summary>Offers the message to the target synchronously.</summary>
598             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
599             internal void OfferToTarget()
600             {
601                 try
602                 {
603                     // Offer the message to the target.  If there's no cancellation in play, we can just allow the target
604                     // to accept the message directly.  But if a CancellationToken is in use, the target needs to come
605                     // back to us to get the data; that way, we can ensure we don't race between returning a canceled task but
606                     // successfully completing the send.
607                     bool consumeToAccept = _cancellationState != CANCELLATION_STATE_NONE;
608
609                     switch (_target.OfferMessage(
610                         Common.SingleMessageHeader, _messageValue, this, consumeToAccept: consumeToAccept))
611                     {
612                         // If the message is immediately accepted, complete the task as accepted
613                         case DataflowMessageStatus.Accepted:
614                             if (!consumeToAccept)
615                             {
616                                 // Cancellation wasn't in use, and the target accepted the message directly,
617                                 // so complete the task as accepted.
618                                 CompleteAsAccepted(runAsync: false);
619                             }
620                             else
621                             {
622                                 // If cancellation is in use, then since the target accepted,
623                                 // our state better reflect that we're completing.
624                                 Debug.Assert(_cancellationState == CANCELLATION_STATE_COMPLETING,
625                                     "The message was accepted, so we should have started completion.");
626                             }
627                             break;
628
629                         // If the message is immediately declined, complete the task as declined
630                         case DataflowMessageStatus.Declined:
631                         case DataflowMessageStatus.DecliningPermanently:
632                             CompleteAsDeclined(runAsync: false);
633                             break;
634 #if DEBUG
635                         case DataflowMessageStatus.NotAvailable:
636                             Debug.Assert(false, "The message should never be missed, as it's offered to only this one target");
637                             break;
638                             // If the message was postponed, the source may or may not be complete yet.  Nothing to validate.
639                             // Treat an improper DataflowMessageStatus as postponed and do nothing.
640 #endif
641                     }
642                 }
643                 // A faulty target might throw from OfferMessage.  If that happens,
644                 // we'll try to fault the returned task.  A really faulty target might
645                 // both throw from OfferMessage and call ConsumeMessage,
646                 // in which case it's possible we might not be able to propagate the exception
647                 // out to the caller through the task if ConsumeMessage wins the race,
648                 // which is likely if the exception doesn't occur until after ConsumeMessage is
649                 // called.  If that happens, we just eat the exception.
650                 catch (Exception exc)
651                 {
652                     Common.StoreDataflowMessageValueIntoExceptionData(exc, _messageValue);
653                     CompleteAsFaulted(exc, runAsync: false);
654                 }
655             }
656
657             /// <summary>Called by the target to consume the buffered message.</summary>
658             TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
659             {
660                 // Validate arguments
661                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
662                 if (target == null) throw new ArgumentNullException("target");
663                 Contract.EndContractBlock();
664
665                 // If the task has already completed, there's nothing to consume.  This could happen if
666                 // cancellation was already requested and completed the task as a result.
667                 if (Task.IsCompleted)
668                 {
669                     messageConsumed = false;
670                     return default(TOutput);
671                 }
672
673                 // If the message being asked for is not the same as the one that's buffered,
674                 // something is wrong.  Complete as having failed to transfer the message.
675                 bool validMessage = (messageHeader.Id == Common.SINGLE_MESSAGE_ID);
676
677                 if (validMessage)
678                 {
679                     int curState = _cancellationState;
680                     Debug.Assert(
681                         curState == CANCELLATION_STATE_NONE || curState == CANCELLATION_STATE_REGISTERED ||
682                         curState == CANCELLATION_STATE_RESERVED || curState == CANCELLATION_STATE_COMPLETING,
683                         "The current cancellation state is not valid.");
684
685                     // If we're not dealing with cancellation, then if we're currently registered or reserved, try to transition 
686                     // to completing. If we're able to, allow the message to be consumed, and we're done.  At this point, we 
687                     // support transitioning out of REGISTERED or RESERVED.
688                     if (curState == CANCELLATION_STATE_NONE || // no synchronization necessary if there's no cancellation
689                         (curState != CANCELLATION_STATE_COMPLETING && // fast check to avoid unnecessary synchronization
690                          Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_COMPLETING, curState) == curState))
691                     {
692                         CompleteAsAccepted(runAsync: true);
693                         messageConsumed = true;
694                         return _messageValue;
695                     }
696                 }
697
698                 // Consumption failed
699                 messageConsumed = false;
700                 return default(TOutput);
701             }
702
703             /// <summary>Called by the target to reserve the buffered message.</summary>
704             bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
705             {
706                 // Validate arguments
707                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
708                 if (target == null) throw new ArgumentNullException("target");
709                 Contract.EndContractBlock();
710
711                 // If the task has already completed, such as due to cancellation, there's nothing to reserve.
712                 if (Task.IsCompleted) return false;
713
714                 // As long as the message is the one being requested and cancellation hasn't been requested, allow it to be reserved.
715                 bool reservable = (messageHeader.Id == Common.SINGLE_MESSAGE_ID);
716                 return reservable &&
717                     (_cancellationState == CANCELLATION_STATE_NONE || // avoid synchronization when cancellation is not in play
718                      Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_RESERVED, CANCELLATION_STATE_REGISTERED) == CANCELLATION_STATE_REGISTERED);
719             }
720
721             /// <summary>Called by the target to release a reservation on the buffered message.</summary>
722             void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
723             {
724                 // Validate arguments
725                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
726                 if (target == null) throw new ArgumentNullException("target");
727                 Contract.EndContractBlock();
728
729                 // If this is not the message we posted, bail
730                 if (messageHeader.Id != Common.SINGLE_MESSAGE_ID)
731                     throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
732
733                 // If the task has already completed, there's nothing to release.
734                 if (Task.IsCompleted) return;
735
736                 // If a cancellation token is being used, revert our state back to registered.  In the meantime
737                 // cancellation could have been requested, so check to see now if cancellation was requested
738                 // and process it if it was.
739                 if (_cancellationState != CANCELLATION_STATE_NONE)
740                 {
741                     if (Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_REGISTERED, CANCELLATION_STATE_RESERVED) != CANCELLATION_STATE_RESERVED)
742                         throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
743                     if (_cancellationToken.IsCancellationRequested)
744                         CancellationHandler(new WeakReference<SendAsyncSource<TOutput>>(this)); // same code as registered with the CancellationToken
745                 }
746
747                 // Start the process over by reoffering the message asynchronously.
748                 OfferToTargetAsync();
749             }
750
751             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
752             Task IDataflowBlock.Completion { get { return Task; } }
753
754             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
755             IDisposable ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
756             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
757             void IDataflowBlock.Complete() { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
758             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
759             void IDataflowBlock.Fault(Exception exception) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
760
761             /// <summary>The data to display in the debugger display attribute.</summary>
762             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
763             private object DebuggerDisplayContent
764             {
765                 get
766                 {
767                     var displayTarget = _target as IDebuggerDisplay;
768                     return string.Format("{0} Message={1}, Target=\"{2}\"",
769                         Common.GetNameForDebugger(this),
770                         _messageValue,
771                         displayTarget != null ? displayTarget.Content : _target);
772                 }
773             }
774             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
775             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
776
777             /// <summary>Provides a debugger type proxy for the source.</summary>
778             private sealed class DebugView
779             {
780                 /// <summary>The source.</summary>
781                 private readonly SendAsyncSource<TOutput> _source;
782
783                 /// <summary>Initializes the debug view.</summary>
784                 /// <param name="source">The source to view.</param>
785                 public DebugView(SendAsyncSource<TOutput> source)
786                 {
787                     Contract.Requires(source != null, "Need a source with which to construct the debug view.");
788                     _source = source;
789                 }
790
791                 /// <summary>The target to which we're linked.</summary>
792                 public ITargetBlock<TOutput> Target { get { return _source._target; } }
793                 /// <summary>The message buffered by the source.</summary>
794                 public TOutput Message { get { return _source._messageValue; } }
795                 /// <summary>The Task represented the posting of the message.</summary>
796                 public Task<bool> Completion { get { return _source.Task; } }
797             }
798         }
799         #endregion
800
801         #region TryReceive, ReceiveAsync, and Receive
802         #region TryReceive
803         /// <summary>
804         /// Attempts to synchronously receive an item from the <see cref="T:System.Threading.Tasks.Dataflow.ISourceBlock`1"/>.
805         /// </summary>
806         /// <param name="source">The source from which to receive.</param>
807         /// <param name="item">The item received from the source.</param>
808         /// <returns>true if an item could be received; otherwise, false.</returns>
809         /// <remarks>
810         /// This method does not wait until the source has an item to provide.
811         /// It will return whether or not an element was available.
812         /// </remarks>
813         public static bool TryReceive<TOutput>(this IReceivableSourceBlock<TOutput> source, out TOutput item)
814         {
815             if (source == null) throw new ArgumentNullException("source");
816             Contract.EndContractBlock();
817
818             return source.TryReceive(null, out item);
819         }
820         #endregion
821
822         #region ReceiveAsync
823         /// <summary>Asynchronously receives a value from the specified source.</summary>
824         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
825         /// <param name="source">The source from which to asynchronously receive.</param>
826         /// <returns>
827         /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation.  When an item is successfully received from the source,
828         /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item.  If an item cannot be retrieved,
829         /// because the source is empty and completed, the returned task will be canceled.
830         /// </returns>
831         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
832         public static Task<TOutput> ReceiveAsync<TOutput>(
833             this ISourceBlock<TOutput> source)
834         {
835             // Argument validation handled by target method
836             return ReceiveAsync(source, Common.InfiniteTimeSpan, CancellationToken.None);
837         }
838
839         /// <summary>Asynchronously receives a value from the specified source.</summary>
840         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
841         /// <param name="source">The source from which to asynchronously receive.</param>
842         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
843         /// <returns>
844         /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation.  When an item is successfully received from the source,
845         /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item.  If an item cannot be retrieved,
846         /// either because cancellation is requested or the source is empty and completed, the returned task will be canceled.
847         /// </returns>
848         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
849         public static Task<TOutput> ReceiveAsync<TOutput>(
850             this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
851         {
852             // Argument validation handled by target method
853             return ReceiveAsync(source, Common.InfiniteTimeSpan, cancellationToken);
854         }
855
856         /// <summary>Asynchronously receives a value from the specified source.</summary>
857         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
858         /// <param name="source">The source from which to asynchronously receive.</param>
859         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
860         /// <returns>
861         /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation.  When an item is successfully received from the source,
862         /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item.  If an item cannot be retrieved,
863         /// either because the timeout expires or the source is empty and completed, the returned task will be canceled.
864         /// </returns>
865         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
866         /// <exception cref="System.ArgumentOutOfRangeException">
867         /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
868         /// </exception>
869         public static Task<TOutput> ReceiveAsync<TOutput>(
870             this ISourceBlock<TOutput> source, TimeSpan timeout)
871         {
872             // Argument validation handled by target method
873             return ReceiveAsync(source, timeout, CancellationToken.None);
874         }
875
876         /// <summary>Asynchronously receives a value from the specified source.</summary>
877         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
878         /// <param name="source">The source from which to asynchronously receive.</param>
879         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
880         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
881         /// <returns>
882         /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation.  When an item is successfully received from the source,
883         /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item.  If an item cannot be retrieved,
884         /// either because the timeout expires, cancellation is requested, or the source is empty and completed, the returned task will be canceled.
885         /// </returns>
886         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
887         /// <exception cref="System.ArgumentOutOfRangeException">
888         /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
889         /// </exception>
890         public static Task<TOutput> ReceiveAsync<TOutput>(
891             this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
892         {
893             // Validate arguments
894
895
896             if (source == null) throw new ArgumentNullException("source");
897             if (!Common.IsValidTimeout(timeout)) throw new ArgumentOutOfRangeException("timeout", SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
898
899             // Return the task representing the core receive operation
900             return ReceiveCore(source, true, timeout, cancellationToken);
901         }
902         #endregion
903
904         #region Receive
905         /// <summary>Synchronously receives an item from the source.</summary>
906         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
907         /// <param name="source">The source from which to receive.</param>
908         /// <returns>The received item.</returns>
909         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
910         /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
911         public static TOutput Receive<TOutput>(
912             this ISourceBlock<TOutput> source)
913         {
914             // Argument validation handled by target method
915             return Receive(source, Common.InfiniteTimeSpan, CancellationToken.None);
916         }
917
918         /// <summary>Synchronously receives an item from the source.</summary>
919         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
920         /// <param name="source">The source from which to receive.</param>
921         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
922         /// <returns>The received item.</returns>
923         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
924         /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
925         /// <exception cref="System.OperationCanceledException">The operation was canceled before an item was received from the source.</exception>
926         /// <remarks>
927         /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent cancellation request occurs.
928         /// </remarks>
929         public static TOutput Receive<TOutput>(
930             this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
931         {
932             // Argument validation handled by target method
933             return Receive(source, Common.InfiniteTimeSpan, cancellationToken);
934         }
935
936         /// <summary>Synchronously receives an item from the source.</summary>
937         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
938         /// <param name="source">The source from which to receive.</param>
939         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
940         /// <returns>The received item.</returns>
941         /// <exception cref="System.ArgumentOutOfRangeException">
942         /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
943         /// </exception>
944         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
945         /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
946         /// <exception cref="System.TimeoutException">The specified timeout expired before an item was received from the source.</exception>
947         /// <remarks>
948         /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent timeout occurs.
949         /// </remarks>
950         public static TOutput Receive<TOutput>(
951             this ISourceBlock<TOutput> source, TimeSpan timeout)
952         {
953             // Argument validation handled by target method
954             return Receive(source, timeout, CancellationToken.None);
955         }
956
957         /// <summary>Synchronously receives an item from the source.</summary>
958         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
959         /// <param name="source">The source from which to receive.</param>
960         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
961         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
962         /// <returns>The received item.</returns>
963         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
964         /// <exception cref="System.ArgumentOutOfRangeException">
965         /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
966         /// </exception>
967         /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
968         /// <exception cref="System.TimeoutException">The specified timeout expired before an item was received from the source.</exception>
969         /// <exception cref="System.OperationCanceledException">The operation was canceled before an item was received from the source.</exception>
970         /// <remarks>
971         /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent timeout or cancellation request occurs.
972         /// </remarks>
973         [SuppressMessage("Microsoft.Usage", "CA2200:RethrowToPreserveStackDetails")]
974         public static TOutput Receive<TOutput>(
975             this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
976         {
977             // Validate arguments
978             if (source == null) throw new ArgumentNullException("source");
979             if (!Common.IsValidTimeout(timeout)) throw new ArgumentOutOfRangeException("timeout", SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
980
981             // Do fast path checks for both cancellation and data already existing.
982             cancellationToken.ThrowIfCancellationRequested();
983             TOutput fastCheckedItem;
984             var receivableSource = source as IReceivableSourceBlock<TOutput>;
985             if (receivableSource != null && receivableSource.TryReceive(null, out fastCheckedItem))
986             {
987                 return fastCheckedItem;
988             }
989
990             // Get a TCS to represent the receive operation and wait for it to complete.
991             // If it completes successfully, return the result. Otherwise, throw the 
992             // original inner exception representing the cause.  This could be an OCE.
993             Task<TOutput> task = ReceiveCore(source, false, timeout, cancellationToken);
994             try
995             {
996                 return task.GetAwaiter().GetResult(); // block until the result is available
997             }
998             catch
999             {
1000                 // Special case cancellation in order to ensure the exception contains the token.
1001                 // The public TrySetCanceled, used by ReceiveCore, is parameterless and doesn't 
1002                 // accept the token to use.  Thus the exception that we're catching here
1003                 // won't contain the cancellation token we want propagated.
1004                 if (task.IsCanceled) cancellationToken.ThrowIfCancellationRequested();
1005
1006                 // If we get here, propagate the original exception.
1007                 throw;
1008             }
1009         }
1010         #endregion
1011
1012         #region Shared by Receive and ReceiveAsync
1013         /// <summary>Receives an item from the source.</summary>
1014         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1015         /// <param name="source">The source from which to receive.</param>
1016         /// <param name="attemptTryReceive">Whether to first attempt using TryReceive to get a value from the source.</param>
1017         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
1018         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
1019         /// <returns>A Task for the receive operation.</returns>
1020         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1021         private static Task<TOutput> ReceiveCore<TOutput>(
1022             this ISourceBlock<TOutput> source, bool attemptTryReceive, TimeSpan timeout, CancellationToken cancellationToken)
1023         {
1024             Contract.Requires(source != null, "Need a source from which to receive.");
1025
1026             // If cancellation has been requested, we're done before we've even started, cancel this receive.
1027             if (cancellationToken.IsCancellationRequested)
1028             {
1029                 return Common.CreateTaskFromCancellation<TOutput>(cancellationToken);
1030             }
1031
1032             if (attemptTryReceive)
1033             {
1034                 // If we're able to directly and immediately receive an item, use that item to complete the receive.
1035                 var receivableSource = source as IReceivableSourceBlock<TOutput>;
1036                 if (receivableSource != null)
1037                 {
1038                     try
1039                     {
1040                         TOutput fastCheckedItem;
1041                         if (receivableSource.TryReceive(null, out fastCheckedItem))
1042                         {
1043                             return Task.FromResult<TOutput>(fastCheckedItem);
1044                         }
1045                     }
1046                     catch (Exception exc)
1047                     {
1048                         return Common.CreateTaskFromException<TOutput>(exc);
1049                     }
1050                 }
1051             }
1052
1053             int millisecondsTimeout = (int)timeout.TotalMilliseconds;
1054             if (millisecondsTimeout == 0)
1055             {
1056                 return Common.CreateTaskFromException<TOutput>(ReceiveTarget<TOutput>.CreateExceptionForTimeout());
1057             }
1058
1059             return ReceiveCoreByLinking<TOutput>(source, millisecondsTimeout, cancellationToken);
1060         }
1061
1062         /// <summary>The reason for a ReceiveCoreByLinking call failing.</summary>
1063         private enum ReceiveCoreByLinkingCleanupReason
1064         {
1065             /// <summary>The Receive operation completed successfully, obtaining a value from the source.</summary>
1066             Success = 0,
1067             /// <summary>The timer expired before a value could be received.</summary>
1068             Timer = 1,
1069             /// <summary>The cancellation token had cancellation requested before a value could be received.</summary>
1070             Cancellation = 2,
1071             /// <summary>The source completed before a value could be received.</summary>
1072             SourceCompletion = 3,
1073             /// <summary>An error occurred while linking up the target.</summary>
1074             SourceProtocolError = 4,
1075             /// <summary>An error during cleanup after completion for another reason.</summary>
1076             ErrorDuringCleanup = 5
1077         }
1078
1079         /// <summary>Cancels a CancellationTokenSource passed as the object state argument.</summary>
1080         private static readonly Action<object> _cancelCts = state => ((CancellationTokenSource)state).Cancel();
1081
1082         /// <summary>Receives an item from the source by linking a temporary target from it.</summary>
1083         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1084         /// <param name="source">The source from which to receive.</param>
1085         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or -1 to wait indefinitely.</param>
1086         /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
1087         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1088         private static Task<TOutput> ReceiveCoreByLinking<TOutput>(ISourceBlock<TOutput> source, int millisecondsTimeout, CancellationToken cancellationToken)
1089         {
1090             // Create a target to link from the source
1091             var target = new ReceiveTarget<TOutput>();
1092
1093             // Keep cancellation registrations inside the try/catch in case the underlying CTS is disposed in which case an exception is thrown
1094             try
1095             {
1096                 // Create a cancellation token that will be canceled when either the provided token 
1097                 // is canceled or the source block completes.
1098                 if (cancellationToken.CanBeCanceled)
1099                 {
1100                     target._externalCancellationToken = cancellationToken;
1101                     target._regFromExternalCancellationToken = cancellationToken.Register(_cancelCts, target._cts);
1102                 }
1103
1104                 // We need to cleanup if one of a few things happens:
1105                 // - The target completes successfully due to receiving data.
1106                 // - The user-specified timeout occurs, such that we should bail on the receive.
1107                 // - The cancellation token has cancellation requested, such that we should bail on the receive.
1108                 // - The source completes, since it won't send any more data.
1109                 // Note that there's a potential race here, in that the cleanup delegate could be executed
1110                 // from the timer before the timer variable is set, but that's ok, because then timer variable
1111                 // will just show up as null in the cleanup and there will be nothing to dispose (nor will anything
1112                 // need to be disposed, since it's the timer that fired.  Timer.Dispose is also thread-safe to be 
1113                 // called multiple times concurrently.)
1114                 if (millisecondsTimeout > 0)
1115                 {
1116                     target._timer = new Timer(
1117                         ReceiveTarget<TOutput>.CachedLinkingTimerCallback, target,
1118                         millisecondsTimeout, Timeout.Infinite);
1119                 }
1120
1121                 if (target._cts.Token.CanBeCanceled)
1122                 {
1123                     target._cts.Token.Register(
1124                         ReceiveTarget<TOutput>.CachedLinkingCancellationCallback, target); // we don't have to cleanup this registration, as this cts is short-lived
1125                 }
1126
1127                 // Link the target to the source
1128                 IDisposable unlink = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
1129                 target._unlink = unlink;
1130
1131                 // If completion has started, there is a chance it started after we linked.
1132                 // In that case, we must dispose of the unlinker.
1133                 // If completion started before we linked, the cleanup code will try to unlink.
1134                 // So we are racing to dispose of the unlinker.
1135                 if (Volatile.Read(ref target._cleanupReserved))
1136                 {
1137                     IDisposable disposableUnlink = Interlocked.CompareExchange(ref target._unlink, null, unlink);
1138                     if (disposableUnlink != null) disposableUnlink.Dispose();
1139                 }
1140             }
1141             catch (Exception exception)
1142             {
1143                 target._receivedException = exception;
1144                 target.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceProtocolError);
1145                 // If we lose the race here, we may end up eating this exception.
1146             }
1147
1148             return target.Task;
1149         }
1150
1151         /// <summary>Provides a TaskCompletionSource that is also a dataflow target for use in ReceiveCore.</summary>
1152         /// <typeparam name="T">Specifies the type of data offered to the target.</typeparam>
1153         [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
1154         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1155         private sealed class ReceiveTarget<T> : TaskCompletionSource<T>, ITargetBlock<T>, IDebuggerDisplay
1156         {
1157             /// <summary>Cached delegate used in ReceiveCoreByLinking on the created timer.  Passed the ReceiveTarget as the argument.</summary>
1158             /// <remarks>The C# compiler will not cache this delegate by default due to it being a generic method on a non-generic class.</remarks>
1159             internal readonly static TimerCallback CachedLinkingTimerCallback = state =>
1160             {
1161                 var receiveTarget = (ReceiveTarget<T>)state;
1162                 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Timer);
1163             };
1164
1165             /// <summary>Cached delegate used in ReceiveCoreByLinking on the cancellation token. Passed the ReceiveTarget as the state argument.</summary>
1166             /// <remarks>The C# compiler will not cache this delegate by default due to it being a generic method on a non-generic class.</remarks>
1167             internal readonly static Action<object> CachedLinkingCancellationCallback = state =>
1168             {
1169                 var receiveTarget = (ReceiveTarget<T>)state;
1170                 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Cancellation);
1171             };
1172
1173             /// <summary>The received value if we accepted a value from the source.</summary>
1174             private T _receivedValue;
1175
1176             /// <summary>The cancellation token source representing both external and internal cancellation.</summary>
1177             internal readonly CancellationTokenSource _cts = new CancellationTokenSource();
1178             /// <summary>Indicates a code path is already on route to complete the target. 0 is false, 1 is true.</summary>
1179             internal bool _cleanupReserved; // must only be accessed under IncomingLock
1180             /// <summary>The external token that cancels the internal token.</summary>
1181             internal CancellationToken _externalCancellationToken;
1182             /// <summary>The registration on the external token that cancels the internal token.</summary>
1183             internal CancellationTokenRegistration _regFromExternalCancellationToken;
1184             /// <summary>The timer that fires when the timeout has been exceeded.</summary>
1185             internal Timer _timer;
1186             /// <summary>The unlinker from removing this target from the source from which we're receiving.</summary>
1187             internal IDisposable _unlink;
1188             /// <summary>The received exception if an error occurred.</summary>
1189             internal Exception _receivedException;
1190
1191             /// <summary>Gets the sync obj used to synchronize all activity on this target.</summary>
1192             internal object IncomingLock { get { return _cts; } }
1193
1194             /// <summary>Initializes the target.</summary>
1195             internal ReceiveTarget() { }
1196
1197             /// <summary>Offers a message to be used to complete the TaskCompletionSource.</summary>
1198             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1199             DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
1200             {
1201                 // Validate arguments
1202                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1203                 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
1204                 Contract.EndContractBlock();
1205
1206                 DataflowMessageStatus status = DataflowMessageStatus.NotAvailable;
1207
1208                 // If we're already one our way to being done, don't accept anything.
1209                 // This is a fast-path check prior to taking the incoming lock;
1210                 // _cleanupReserved only ever goes from false to true.
1211                 if (Volatile.Read(ref _cleanupReserved)) return DataflowMessageStatus.DecliningPermanently;
1212
1213                 lock (IncomingLock)
1214                 {
1215                     // Check again now that we've taken the lock
1216                     if (_cleanupReserved) return DataflowMessageStatus.DecliningPermanently;
1217
1218                     try
1219                     {
1220                         // Accept the message if possible and complete this task with the message's value.
1221                         bool consumed = true;
1222                         T acceptedValue = consumeToAccept ? source.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
1223                         if (consumed)
1224                         {
1225                             status = DataflowMessageStatus.Accepted;
1226                             _receivedValue = acceptedValue;
1227                             _cleanupReserved = true;
1228                         }
1229                     }
1230                     catch (Exception exc)
1231                     {
1232                         // An error occurred.  Take ourselves out of the game.
1233                         status = DataflowMessageStatus.DecliningPermanently;
1234                         Common.StoreDataflowMessageValueIntoExceptionData(exc, messageValue);
1235                         _receivedException = exc;
1236                         _cleanupReserved = true;
1237                     }
1238                 }
1239
1240                 // Do any cleanup outside of the lock.  The right to cleanup was reserved above for these cases.
1241                 if (status == DataflowMessageStatus.Accepted)
1242                 {
1243                     CleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Success);
1244                 }
1245                 else if (status == DataflowMessageStatus.DecliningPermanently) // should only be the case if an error occurred
1246                 {
1247                     CleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceProtocolError);
1248                 }
1249
1250                 return status;
1251             }
1252
1253             /// <summary>
1254             /// Attempts to reserve the right to cleanup and complete, and if successfully, 
1255             /// continues to cleanup and complete.
1256             /// </summary>
1257             /// <param name="reason">The reason we're completing and cleaning up.</param>
1258             /// <returns>true if successful in completing; otherwise, false.</returns>
1259             internal bool TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
1260             {
1261                 // If cleanup was already reserved, bail.
1262                 if (Volatile.Read(ref _cleanupReserved)) return false;
1263
1264                 // Atomically using IncomingLock try to reserve the completion routine.
1265                 lock (IncomingLock)
1266                 {
1267                     if (_cleanupReserved) return false;
1268                     _cleanupReserved = true;
1269                 }
1270
1271                 // We've reserved cleanup and completion, so do it.
1272                 CleanupAndComplete(reason);
1273                 return true;
1274             }
1275
1276             /// <summary>Cleans up the target for completion.</summary>
1277             /// <param name="reason">The reason we're completing and cleaning up.</param>
1278             /// <remarks>This method must only be called once on this instance.</remarks>
1279             [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1280             [SuppressMessage("Microsoft.Usage", "CA2201:DoNotRaiseReservedExceptionTypes")]
1281             private void CleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
1282             {
1283                 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
1284                 Debug.Assert(Volatile.Read(ref _cleanupReserved), "Should only be called once by whomever reserved the right.");
1285
1286                 // Unlink from the source.  If we're cleaning up because the source
1287                 // completed, this is unnecessary, as the source should have already
1288                 // emptied out its target registry, or at least be in the process of doing so.
1289                 // We are racing with the linking code - only one can dispose of the unlinker.
1290                 IDisposable unlink = _unlink;
1291                 if (reason != ReceiveCoreByLinkingCleanupReason.SourceCompletion && unlink != null)
1292                 {
1293                     IDisposable disposableUnlink = Interlocked.CompareExchange(ref _unlink, null, unlink);
1294                     if (disposableUnlink != null)
1295                     {
1296                         // If an error occurs, fault the target and override the reason to
1297                         // continue executing, i.e. do the remaining cleanup without completing
1298                         // the target the way we originally intended to.
1299                         try
1300                         {
1301                             disposableUnlink.Dispose(); // must not be holding IncomingLock, or could deadlock
1302                         }
1303                         catch (Exception exc)
1304                         {
1305                             _receivedException = exc;
1306                             reason = ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1307                         }
1308                     }
1309                 }
1310
1311                 // Cleanup the timer.  (Even if we're here because of the timer firing, we still
1312                 // want to aggressively dispose of the timer.)
1313                 if (_timer != null) _timer.Dispose();
1314
1315                 // Cancel the token everyone is listening to.  We also want to unlink
1316                 // from the user-provided cancellation token to prevent a leak.
1317                 // We do *not* dispose of the cts itself here, as there could be a race
1318                 // with the code registering this cleanup delegate with cts; not disposing
1319                 // is ok, though, because there's no resources created by the CTS
1320                 // that needs to be cleaned up since we're not using the wait handle.
1321                 // This is also why we don't use CreateLinkedTokenSource, as that combines
1322                 // both disposing of the token source and disposal of the connection link
1323                 // into a single dispose operation.
1324                 // if we're here because of cancellation, no need to cancel again
1325                 if (reason != ReceiveCoreByLinkingCleanupReason.Cancellation)
1326                 {
1327                     // if the source complete without receiving a value, we check the cancellation one more time
1328                     if (reason == ReceiveCoreByLinkingCleanupReason.SourceCompletion &&
1329                         (_externalCancellationToken.IsCancellationRequested || _cts.IsCancellationRequested))
1330                     {
1331                         reason = ReceiveCoreByLinkingCleanupReason.Cancellation;
1332                     }
1333                     _cts.Cancel();
1334                 }
1335                 _regFromExternalCancellationToken.Dispose();
1336
1337                 // No need to dispose of the cts, either, as we're not accessing its WaitHandle
1338                 // nor was it created as a linked token source.  Disposing it could also be dangerous
1339                 // if other code tries to access it after we dispose of it... best to leave it available.
1340
1341                 // Complete the task based on the reason
1342                 switch (reason)
1343                 {
1344                     // Task final state: RanToCompletion
1345                     case ReceiveCoreByLinkingCleanupReason.Success:
1346                         System.Threading.Tasks.Task.Factory.StartNew(state =>
1347                         {
1348                             // Complete with the received value
1349                             var target = (ReceiveTarget<T>)state;
1350                             try { target.TrySetResult(target._receivedValue); }
1351                             catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1352                         }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1353                         break;
1354
1355                     // Task final state: Canceled
1356                     case ReceiveCoreByLinkingCleanupReason.Cancellation:
1357                         System.Threading.Tasks.Task.Factory.StartNew(state =>
1358                         {
1359                             // Complete as canceled
1360                             var target = (ReceiveTarget<T>)state;
1361                             try { target.TrySetCanceled(); }
1362                             catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1363                         }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1364                         break;
1365                     default:
1366                         Debug.Assert(false, "Invalid linking cleanup reason specified.");
1367                         goto case ReceiveCoreByLinkingCleanupReason.Cancellation;
1368
1369                     // Task final state: Faulted
1370                     case ReceiveCoreByLinkingCleanupReason.SourceCompletion:
1371                         if (_receivedException == null) _receivedException = CreateExceptionForSourceCompletion();
1372                         goto case ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1373                     case ReceiveCoreByLinkingCleanupReason.Timer:
1374                         if (_receivedException == null) _receivedException = CreateExceptionForTimeout();
1375                         goto case ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1376                     case ReceiveCoreByLinkingCleanupReason.SourceProtocolError:
1377                     case ReceiveCoreByLinkingCleanupReason.ErrorDuringCleanup:
1378                         Debug.Assert(_receivedException != null, "We need an exception with which to fault the task.");
1379                         System.Threading.Tasks.Task.Factory.StartNew(state =>
1380                         {
1381                             // Complete with the received exception
1382                             var target = (ReceiveTarget<T>)state;
1383                             try { target.TrySetException(target._receivedException ?? new Exception()); }
1384                             catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1385                         }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1386                         break;
1387                 }
1388             }
1389
1390             /// <summary>Creates an exception to use when a source completed before receiving a value.</summary>
1391             /// <returns>The initialized exception.</returns>
1392             internal static Exception CreateExceptionForSourceCompletion()
1393             {
1394                 return Common.InitializeStackTrace(new InvalidOperationException(SR.InvalidOperation_DataNotAvailableForReceive));
1395             }
1396
1397             /// <summary>Creates an exception to use when a timeout occurs before receiving a value.</summary>
1398             /// <returns>The initialized exception.</returns>
1399             internal static Exception CreateExceptionForTimeout()
1400             {
1401                 return Common.InitializeStackTrace(new TimeoutException());
1402             }
1403
1404             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1405             void IDataflowBlock.Complete()
1406             {
1407                 TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceCompletion);
1408             }
1409
1410             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1411             void IDataflowBlock.Fault(Exception exception) { ((IDataflowBlock)this).Complete(); }
1412
1413             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1414             Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
1415
1416             /// <summary>The data to display in the debugger display attribute.</summary>
1417             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1418             private object DebuggerDisplayContent
1419             {
1420                 get
1421                 {
1422                     return string.Format("{0} IsCompleted={1}",
1423                         Common.GetNameForDebugger(this), base.Task.IsCompleted);
1424                 }
1425             }
1426             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1427             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1428         }
1429         #endregion
1430         #endregion
1431
1432         #region OutputAvailableAsync
1433         /// <summary>
1434         /// Provides a <see cref="System.Threading.Tasks.Task{TResult}"/> 
1435         /// that asynchronously monitors the source for available output.
1436         /// </summary>
1437         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1438         /// <param name="source">The source to monitor.</param>
1439         /// <returns>
1440         /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that informs of whether and when
1441         /// more output is available.  When the task completes, if its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> is true, more output 
1442         /// is available in the source (though another consumer of the source may retrieve the data).  
1443         /// If it returns false, more output is not and will never be available, due to the source 
1444         /// completing prior to output being available.
1445         /// </returns>
1446         public static Task<bool> OutputAvailableAsync<TOutput>(this ISourceBlock<TOutput> source)
1447         {
1448             return OutputAvailableAsync<TOutput>(source, CancellationToken.None);
1449         }
1450
1451         /// <summary>
1452         /// Provides a <see cref="System.Threading.Tasks.Task{TResult}"/> 
1453         /// that asynchronously monitors the source for available output.
1454         /// </summary>
1455         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1456         /// <param name="source">The source to monitor.</param>
1457         /// <param name="cancellationToken">The cancellation token with which to cancel the asynchronous operation.</param>
1458         /// <returns>
1459         /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that informs of whether and when
1460         /// more output is available.  When the task completes, if its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> is true, more output 
1461         /// is available in the source (though another consumer of the source may retrieve the data).  
1462         /// If it returns false, more output is not and will never be available, due to the source 
1463         /// completing prior to output being available.
1464         /// </returns>
1465         [SuppressMessage("Microsoft.Design", "CA1011:ConsiderPassingBaseTypesAsParameters")]
1466         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1467         [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1468         public static Task<bool> OutputAvailableAsync<TOutput>(
1469             this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
1470         {
1471             // Validate arguments
1472             if (source == null) throw new ArgumentNullException("source");
1473             Contract.EndContractBlock();
1474
1475             // Fast path for cancellation
1476             if (cancellationToken.IsCancellationRequested)
1477                 return Common.CreateTaskFromCancellation<bool>(cancellationToken);
1478
1479             // In a method like this, normally we would want to check source.Completion.IsCompleted
1480             // and avoid linking completely by simply returning a completed task.  However,
1481             // some blocks that are completed still have data available, like WriteOnceBlock,
1482             // which completes as soon as it gets a value and stores that value forever.
1483             // As such, OutputAvailableAsync must link from the source so that the source
1484             // can push data to us if it has it, at which point we can immediately unlink.
1485
1486             // Create a target task that will complete when it's offered a message (but it won't accept the message)
1487             var target = new OutputAvailableAsyncTarget<TOutput>();
1488             try
1489             {
1490                 // Link from the source.  If the source propagates a message during or immediately after linking
1491                 // such that our target is already completed, just return its task.
1492                 target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
1493
1494                 // If the task is already completed (an exception may have occurred, or the source may have propagated
1495                 // a message to the target during LinkTo or soon thereafter), just return the task directly.
1496                 if (target.Task.IsCompleted)
1497                 {
1498                     return target.Task;
1499                 }
1500
1501                 // If cancellation could be requested, hook everything up to be notified of cancellation requests.
1502                 if (cancellationToken.CanBeCanceled)
1503                 {
1504                     // When cancellation is requested, unlink the target from the source and cancel the target.
1505                     target._ctr = cancellationToken.Register(OutputAvailableAsyncTarget<TOutput>.s_cancelAndUnlink, target);
1506                 }
1507
1508                 // We can't return the task directly, as the source block will be completing the task synchronously,
1509                 // and thus any synchronous continuations would run as part of the source block's call.  We don't have to worry
1510                 // about cancellation, as we've coded cancellation to complete the task asynchronously, and with the continuation
1511                 // set as NotOnCanceled, so the continuation will be canceled immediately when the antecedent is canceled, which
1512                 // will thus be asynchronously from the cancellation token source's cancellation call.
1513                 return target.Task.ContinueWith(
1514                     OutputAvailableAsyncTarget<TOutput>.s_handleCompletion, target,
1515                     CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
1516             }
1517             catch (Exception exc)
1518             {
1519                 // Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested
1520                 // such that it synchronously invokes the source's unlinker IDisposable, which could throw.
1521                 target.TrySetException(exc);
1522
1523                 // Undo the link from the source to the target
1524                 target.AttemptThreadSafeUnlink();
1525
1526                 // Return the now faulted task
1527                 return target.Task;
1528             }
1529         }
1530
1531         /// <summary>Provides a target used in OutputAvailableAsync operations.</summary>
1532         /// <typeparam name="T">Specifies the type of data in the data source being checked.</typeparam>
1533         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1534         private sealed class OutputAvailableAsyncTarget<T> : TaskCompletionSource<bool>, ITargetBlock<T>, IDebuggerDisplay
1535         {
1536             /// <summary>
1537             /// Cached continuation delegate that unregisters from cancellation and
1538             /// marshals the antecedent's result to the return value.
1539             /// </summary>
1540             internal readonly static Func<Task<bool>, object, bool> s_handleCompletion = (antecedent, state) =>
1541             {
1542                 var target = state as OutputAvailableAsyncTarget<T>;
1543                 Debug.Assert(target != null, "Expected non-null target");
1544                 target._ctr.Dispose();
1545                 return antecedent.GetAwaiter().GetResult();
1546             };
1547
1548             /// <summary>
1549             /// Cached delegate that cancels the target and unlinks the target from the source.
1550             /// Expects an OutputAvailableAsyncTarget as the state argument. 
1551             /// </summary>
1552             internal readonly static Action<object> s_cancelAndUnlink = CancelAndUnlink;
1553
1554             /// <summary>Cancels the target and unlinks the target from the source.</summary>
1555             /// <param name="state">An OutputAvailableAsyncTarget.</param>
1556             private static void CancelAndUnlink(object state)
1557             {
1558                 var target = state as OutputAvailableAsyncTarget<T>;
1559                 Debug.Assert(target != null, "Expected a non-null target");
1560
1561                 // Cancel asynchronously so that we're not completing the task as part of the cts.Cancel() call,
1562                 // since synchronous continuations off that task would then run as part of Cancel.
1563                 // Take advantage of this task and unlink from there to avoid doing the interlocked operation synchronously.
1564                 System.Threading.Tasks.Task.Factory.StartNew(tgt =>
1565                                                             {
1566                                                                 var thisTarget = (OutputAvailableAsyncTarget<T>)tgt;
1567                                                                 thisTarget.TrySetCanceled();
1568                                                                 thisTarget.AttemptThreadSafeUnlink();
1569                                                             },
1570                     target, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
1571             }
1572
1573             /// <summary>Disposes of _unlinker if the target has been linked.</summary>
1574             internal void AttemptThreadSafeUnlink()
1575             {
1576                 // A race is possible. Therefore use an interlocked operation.
1577                 IDisposable cachedUnlinker = _unlinker;
1578                 if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker)
1579                 {
1580                     cachedUnlinker.Dispose();
1581                 }
1582             }
1583
1584             /// <summary>The IDisposable used to unlink this target from its source.</summary>
1585             internal IDisposable _unlinker;
1586             /// <summary>The registration used to unregister this target from the cancellation token.</summary>
1587             internal CancellationTokenRegistration _ctr;
1588
1589             /// <summary>Completes the task when offered a message (but doesn't consume the message).</summary>
1590             DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
1591             {
1592                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1593                 if (source == null) throw new ArgumentNullException("source");
1594                 Contract.EndContractBlock();
1595
1596                 TrySetResult(true);
1597                 return DataflowMessageStatus.DecliningPermanently;
1598             }
1599
1600             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1601             void IDataflowBlock.Complete()
1602             {
1603                 TrySetResult(false);
1604             }
1605
1606             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1607             void IDataflowBlock.Fault(Exception exception)
1608             {
1609                 if (exception == null) throw new ArgumentNullException("exception");
1610                 Contract.EndContractBlock();
1611                 TrySetResult(false);
1612             }
1613
1614             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1615             Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
1616
1617             /// <summary>The data to display in the debugger display attribute.</summary>
1618             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1619             private object DebuggerDisplayContent
1620             {
1621                 get
1622                 {
1623                     return string.Format("{0} IsCompleted={1}",
1624                         Common.GetNameForDebugger(this), base.Task.IsCompleted);
1625                 }
1626             }
1627             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1628             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1629         }
1630         #endregion
1631
1632         #region Encapsulate
1633         /// <summary>Encapsulates a target and a source into a single propagator.</summary>
1634         /// <typeparam name="TInput">Specifies the type of input expected by the target.</typeparam>
1635         /// <typeparam name="TOutput">Specifies the type of output produced by the source.</typeparam>
1636         /// <param name="target">The target to encapsulate.</param>
1637         /// <param name="source">The source to encapsulate.</param>
1638         /// <returns>The encapsulated target and source.</returns>
1639         /// <remarks>
1640         /// This method does not in any way connect the target to the source. It creates a
1641         /// propagator block whose target methods delegate to the specified target and whose
1642         /// source methods delegate to the specified source.  Any connection between the target
1643         /// and the source is left for the developer to explicitly provide.  The propagator's
1644         /// <see cref="IDataflowBlock"/> implementation delegates to the specified source.
1645         /// </remarks>
1646         public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
1647             ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
1648         {
1649             if (target == null) throw new ArgumentNullException("target");
1650             if (source == null) throw new ArgumentNullException("source");
1651             Contract.EndContractBlock();
1652             return new EncapsulatingPropagator<TInput, TOutput>(target, source);
1653         }
1654
1655         /// <summary>Provides a dataflow block that encapsulates a target and a source to form a single propagator.</summary>
1656         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1657         [DebuggerTypeProxy(typeof(EncapsulatingPropagator<,>.DebugView))]
1658         private sealed class EncapsulatingPropagator<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
1659         {
1660             /// <summary>The target half.</summary>
1661             private ITargetBlock<TInput> _target;
1662             /// <summary>The source half.</summary>
1663             private ISourceBlock<TOutput> _source;
1664
1665             public EncapsulatingPropagator(ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
1666             {
1667                 Contract.Requires(target != null, "The target should never be null; this should be checked by all internal usage.");
1668                 Contract.Requires(source != null, "The source should never be null; this should be checked by all internal usage.");
1669                 _target = target;
1670                 _source = source;
1671             }
1672
1673             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1674             public void Complete()
1675             {
1676                 _target.Complete();
1677             }
1678
1679             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1680             void IDataflowBlock.Fault(Exception exception)
1681             {
1682                 if (exception == null) throw new ArgumentNullException("exception");
1683                 Contract.EndContractBlock();
1684
1685                 _target.Fault(exception);
1686             }
1687             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
1688             public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
1689             {
1690                 return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
1691             }
1692
1693             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1694             public Task Completion { get { return _source.Completion; } }
1695
1696             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
1697             public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
1698             {
1699                 return _source.LinkTo(target, linkOptions);
1700             }
1701
1702             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
1703             public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
1704             {
1705                 var receivableSource = _source as IReceivableSourceBlock<TOutput>;
1706                 if (receivableSource != null) return receivableSource.TryReceive(filter, out item);
1707
1708                 item = default(TOutput);
1709                 return false;
1710             }
1711
1712             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
1713             public bool TryReceiveAll(out IList<TOutput> items)
1714             {
1715                 var receivableSource = _source as IReceivableSourceBlock<TOutput>;
1716                 if (receivableSource != null) return receivableSource.TryReceiveAll(out items);
1717
1718                 items = default(IList<TOutput>);
1719                 return false;
1720             }
1721
1722             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
1723             public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
1724             {
1725                 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
1726             }
1727
1728             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
1729             public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1730             {
1731                 return _source.ReserveMessage(messageHeader, target);
1732             }
1733
1734             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
1735             public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1736             {
1737                 _source.ReleaseReservation(messageHeader, target);
1738             }
1739
1740             /// <summary>The data to display in the debugger display attribute.</summary>
1741             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1742             private object DebuggerDisplayContent
1743             {
1744                 get
1745                 {
1746                     var displayTarget = _target as IDebuggerDisplay;
1747                     var displaySource = _source as IDebuggerDisplay;
1748                     return string.Format("{0} Target=\"{1}\", Source=\"{2}\"",
1749                         Common.GetNameForDebugger(this),
1750                         displayTarget != null ? displayTarget.Content : _target,
1751                         displaySource != null ? displaySource.Content : _source);
1752                 }
1753             }
1754             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1755             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1756
1757             /// <summary>A debug view for the propagator.</summary>
1758             private sealed class DebugView
1759             {
1760                 /// <summary>The propagator being debugged.</summary>
1761                 private readonly EncapsulatingPropagator<TInput, TOutput> _propagator;
1762
1763                 /// <summary>Initializes the debug view.</summary>
1764                 /// <param name="propagator">The propagator being debugged.</param>
1765                 public DebugView(EncapsulatingPropagator<TInput, TOutput> propagator)
1766                 {
1767                     Contract.Requires(propagator != null, "Need a block with which to construct the debug view.");
1768                     _propagator = propagator;
1769                 }
1770
1771                 /// <summary>The target.</summary>
1772                 public ITargetBlock<TInput> Target { get { return _propagator._target; } }
1773                 /// <summary>The source.</summary>
1774                 public ISourceBlock<TOutput> Source { get { return _propagator._source; } }
1775             }
1776         }
1777         #endregion
1778
1779         #region Choose
1780         #region Choose<T1,T2>
1781         /// <summary>Monitors two dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1782         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1783         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1784         /// <param name="source1">The first source.</param>
1785         /// <param name="action1">The handler to execute on data from the first source.</param>
1786         /// <param name="source2">The second source.</param>
1787         /// <param name="action2">The handler to execute on data from the second source.</param>
1788         /// <returns>
1789         /// <para>
1790         /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1791         /// If both sources are completed prior to the choice completing, 
1792         /// the resulting task will be canceled. When one of the sources has data available and successfully propagates 
1793         /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1794         /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1795         /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to either 0 or 1 to
1796         /// represent the first or second source, respectively.
1797         /// </para>
1798         /// <para>
1799         /// This method will only consume an element from one of the two data sources, never both.
1800         /// </para>
1801         /// </returns>
1802         /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1803         /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1804         /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1805         /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1806         public static Task<Int32> Choose<T1, T2>(
1807             ISourceBlock<T1> source1, Action<T1> action1,
1808             ISourceBlock<T2> source2, Action<T2> action2)
1809         {
1810             // All argument validation is handled by the delegated method
1811             return Choose(source1, action1, source2, action2, DataflowBlockOptions.Default);
1812         }
1813
1814         /// <summary>Monitors two dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1815         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1816         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1817         /// <param name="source1">The first source.</param>
1818         /// <param name="action1">The handler to execute on data from the first source.</param>
1819         /// <param name="source2">The second source.</param>
1820         /// <param name="action2">The handler to execute on data from the second source.</param>
1821         /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1822         /// <returns>
1823         /// <para>
1824         /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1825         /// If both sources are completed prior to the choice completing, or if the CancellationToken
1826         /// provided as part of <paramref name="dataflowBlockOptions"/> is canceled prior to the choice completing,
1827         /// the resulting task will be canceled. When one of the sources has data available and successfully propagates 
1828         /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1829         /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1830         /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to either 0 or 1 to
1831         /// represent the first or second source, respectively.
1832         /// </para>
1833         /// <para>
1834         /// This method will only consume an element from one of the two data sources, never both.
1835         /// If cancellation is requested after an element has been received, the cancellation request will be ignored,
1836         /// and the relevant handler will be allowed to execute. 
1837         /// </para>
1838         /// </returns>
1839         /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1840         /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1841         /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1842         /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1843         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
1844         [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1845         public static Task<Int32> Choose<T1, T2>(
1846             ISourceBlock<T1> source1, Action<T1> action1,
1847             ISourceBlock<T2> source2, Action<T2> action2,
1848             DataflowBlockOptions dataflowBlockOptions)
1849         {
1850             // Validate arguments
1851             if (source1 == null) throw new ArgumentNullException("source1");
1852             if (action1 == null) throw new ArgumentNullException("action1");
1853             if (source2 == null) throw new ArgumentNullException("source2");
1854             if (action2 == null) throw new ArgumentNullException("action2");
1855             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
1856
1857             // Delegate to the shared implementation
1858             return ChooseCore<T1, T2, VoidResult>(source1, action1, source2, action2, null, null, dataflowBlockOptions);
1859         }
1860         #endregion
1861
1862         #region Choose<T1,T2,T3>
1863         /// <summary>Monitors three dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1864         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1865         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1866         /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1867         /// <param name="source1">The first source.</param>
1868         /// <param name="action1">The handler to execute on data from the first source.</param>
1869         /// <param name="source2">The second source.</param>
1870         /// <param name="action2">The handler to execute on data from the second source.</param>
1871         /// <param name="source3">The third source.</param>
1872         /// <param name="action3">The handler to execute on data from the third source.</param>
1873         /// <returns>
1874         /// <para>
1875         /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1876         /// If all sources are completed prior to the choice completing, 
1877         /// the resulting task will be canceled. When one of the sources has data available and successfully propagates 
1878         /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1879         /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1880         /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to the 0-based index of the source.
1881         /// </para>
1882         /// <para>
1883         /// This method will only consume an element from one of the data sources, never more than one.
1884         /// </para>
1885         /// </returns>
1886         /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1887         /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1888         /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1889         /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1890         /// <exception cref="System.ArgumentNullException">The <paramref name="source3"/> is null (Nothing in Visual Basic).</exception>
1891         /// <exception cref="System.ArgumentNullException">The <paramref name="action3"/> is null (Nothing in Visual Basic).</exception>
1892         public static Task<Int32> Choose<T1, T2, T3>(
1893             ISourceBlock<T1> source1, Action<T1> action1,
1894             ISourceBlock<T2> source2, Action<T2> action2,
1895             ISourceBlock<T3> source3, Action<T3> action3)
1896         {
1897             // All argument validation is handled by the delegated method
1898             return Choose(source1, action1, source2, action2, source3, action3, DataflowBlockOptions.Default);
1899         }
1900
1901         /// <summary>Monitors three dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1902         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1903         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1904         /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1905         /// <param name="source1">The first source.</param>
1906         /// <param name="action1">The handler to execute on data from the first source.</param>
1907         /// <param name="source2">The second source.</param>
1908         /// <param name="action2">The handler to execute on data from the second source.</param>
1909         /// <param name="source3">The third source.</param>
1910         /// <param name="action3">The handler to execute on data from the third source.</param>
1911         /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1912         /// <returns>
1913         /// <para>
1914         /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1915         /// If all sources are completed prior to the choice completing, or if the CancellationToken
1916         /// provided as part of <paramref name="dataflowBlockOptions"/> is canceled prior to the choice completing,
1917         /// the resulting task will be canceled. When one of the sources has data available and successfully propagates 
1918         /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1919         /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1920         /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to the 0-based index of the source.
1921         /// </para>
1922         /// <para>
1923         /// This method will only consume an element from one of the data sources, never more than one.
1924         /// If cancellation is requested after an element has been received, the cancellation request will be ignored,
1925         /// and the relevant handler will be allowed to execute. 
1926         /// </para>
1927         /// </returns>
1928         /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1929         /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1930         /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1931         /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1932         /// <exception cref="System.ArgumentNullException">The <paramref name="source3"/> is null (Nothing in Visual Basic).</exception>
1933         /// <exception cref="System.ArgumentNullException">The <paramref name="action3"/> is null (Nothing in Visual Basic).</exception>
1934         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
1935         [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1936         public static Task<Int32> Choose<T1, T2, T3>(
1937             ISourceBlock<T1> source1, Action<T1> action1,
1938             ISourceBlock<T2> source2, Action<T2> action2,
1939             ISourceBlock<T3> source3, Action<T3> action3,
1940             DataflowBlockOptions dataflowBlockOptions)
1941         {
1942             // Validate arguments
1943             if (source1 == null) throw new ArgumentNullException("source1");
1944             if (action1 == null) throw new ArgumentNullException("action1");
1945             if (source2 == null) throw new ArgumentNullException("source2");
1946             if (action2 == null) throw new ArgumentNullException("action2");
1947             if (source3 == null) throw new ArgumentNullException("source3");
1948             if (action3 == null) throw new ArgumentNullException("action3");
1949             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
1950
1951             // Delegate to the shared implementation
1952             return ChooseCore<T1, T2, T3>(source1, action1, source2, action2, source3, action3, dataflowBlockOptions);
1953         }
1954         #endregion
1955
1956         #region Choose Shared
1957         /// <summary>Monitors dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1958         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1959         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1960         /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1961         /// <param name="source1">The first source.</param>
1962         /// <param name="action1">The handler to execute on data from the first source.</param>
1963         /// <param name="source2">The second source.</param>
1964         /// <param name="action2">The handler to execute on data from the second source.</param>
1965         /// <param name="source3">The third source.</param>
1966         /// <param name="action3">The handler to execute on data from the third source.</param>
1967         /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1968         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1969         [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1970         private static Task<Int32> ChooseCore<T1, T2, T3>(
1971             ISourceBlock<T1> source1, Action<T1> action1,
1972             ISourceBlock<T2> source2, Action<T2> action2,
1973             ISourceBlock<T3> source3, Action<T3> action3,
1974             DataflowBlockOptions dataflowBlockOptions)
1975         {
1976             Contract.Requires(source1 != null && action1 != null, "The first source and action should not be null.");
1977             Contract.Requires(source2 != null && action2 != null, "The second source and action should not be null.");
1978             Contract.Requires((source3 == null) == (action3 == null), "The third action should be null iff the third source is null.");
1979             Contract.Requires(dataflowBlockOptions != null, "Options are required.");
1980             bool hasThirdSource = source3 != null; // In the future, if we want higher arities on Choose, we can simply add more such checks on additional arguments
1981
1982             // Early cancellation check and bail out
1983             if (dataflowBlockOptions.CancellationToken.IsCancellationRequested)
1984                 return Common.CreateTaskFromCancellation<Int32>(dataflowBlockOptions.CancellationToken);
1985
1986             // Fast path: if any of the sources already has data available that can be received immediately.
1987             Task<int> resultTask;
1988             try
1989             {
1990                 TaskScheduler scheduler = dataflowBlockOptions.TaskScheduler;
1991                 if (TryChooseFromSource(source1, action1, 0, scheduler, out resultTask) ||
1992                     TryChooseFromSource(source2, action2, 1, scheduler, out resultTask) ||
1993                     (hasThirdSource && TryChooseFromSource(source3, action3, 2, scheduler, out resultTask)))
1994                 {
1995                     return resultTask;
1996                 }
1997             }
1998             catch (Exception exc)
1999             {
2000                 // In case TryReceive in TryChooseFromSource erroneously throws
2001                 return Common.CreateTaskFromException<int>(exc);
2002             }
2003
2004             // Slow path: link up to all of the sources.  Separated out to avoid a closure on the fast path.
2005             return ChooseCoreByLinking(source1, action1, source2, action2, source3, action3, dataflowBlockOptions);
2006         }
2007
2008         /// <summary>
2009         /// Tries to remove data from a receivable source and schedule an action to process that received item.
2010         /// </summary>
2011         /// <typeparam name="T">Specifies the type of data to process.</typeparam>
2012         /// <param name="source">The source from which to receive the data.</param>
2013         /// <param name="action">The action to run for the received data.</param>
2014         /// <param name="branchId">The branch ID associated with this source/action pair.</param>
2015         /// <param name="scheduler">The scheduler to use to process the action.</param>
2016         /// <param name="task">The task created for processing the received item.</param>
2017         /// <returns>true if this try attempt satisfies the choose operation; otherwise, false.</returns>
2018         private static bool TryChooseFromSource<T>(
2019             ISourceBlock<T> source, Action<T> action, int branchId, TaskScheduler scheduler,
2020             out Task<int> task)
2021         {
2022             // Validate arguments
2023             Contract.Requires(source != null, "Expected a non-null source");
2024             Contract.Requires(action != null, "Expected a non-null action");
2025             Contract.Requires(branchId >= 0, "Expected a valid branch ID (> 0)");
2026             Contract.Requires(scheduler != null, "Expected a non-null scheduler");
2027
2028             // Try to receive from the source.  If we can't, bail.
2029             T result;
2030             var receivableSource = source as IReceivableSourceBlock<T>;
2031             if (receivableSource == null || !receivableSource.TryReceive(out result))
2032             {
2033                 task = null;
2034                 return false;
2035             }
2036
2037             // We successfully received an item.  Launch a task to process it.
2038             task = Task.Factory.StartNew(ChooseTarget<T>.s_processBranchFunction,
2039                 Tuple.Create<Action<T>, T, int>(action, result, branchId),
2040                 CancellationToken.None, Common.GetCreationOptionsForTask(), scheduler);
2041             return true;
2042         }
2043
2044         /// <summary>Monitors dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
2045         /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
2046         /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
2047         /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
2048         /// <param name="source1">The first source.</param>
2049         /// <param name="action1">The handler to execute on data from the first source.</param>
2050         /// <param name="source2">The second source.</param>
2051         /// <param name="action2">The handler to execute on data from the second source.</param>
2052         /// <param name="source3">The third source.</param>
2053         /// <param name="action3">The handler to execute on data from the third source.</param>
2054         /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
2055         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2056         [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
2057         private static Task<Int32> ChooseCoreByLinking<T1, T2, T3>(
2058             ISourceBlock<T1> source1, Action<T1> action1,
2059             ISourceBlock<T2> source2, Action<T2> action2,
2060             ISourceBlock<T3> source3, Action<T3> action3,
2061             DataflowBlockOptions dataflowBlockOptions)
2062         {
2063             Contract.Requires(source1 != null && action1 != null, "The first source and action should not be null.");
2064             Contract.Requires(source2 != null && action2 != null, "The second source and action should not be null.");
2065             Contract.Requires((source3 == null) == (action3 == null), "The third action should be null iff the third source is null.");
2066             Contract.Requires(dataflowBlockOptions != null, "Options are required.");
2067
2068             bool hasThirdSource = source3 != null; // In the future, if we want higher arities on Choose, we can simply add more such checks on additional arguments
2069
2070             // Create object to act as both completion marker and sync obj for targets.
2071             var boxedCompleted = new StrongBox<Task>();
2072
2073             // Set up teardown cancellation.  We will request cancellation when a) the supplied options token
2074             // has cancellation requested or b) when we actually complete somewhere in order to tear down
2075             // the rest of our configured set up.
2076             CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken, CancellationToken.None);
2077
2078             // Set up the branches.
2079             TaskScheduler scheduler = dataflowBlockOptions.TaskScheduler;
2080             var branchTasks = new Task<int>[hasThirdSource ? 3 : 2];
2081             branchTasks[0] = CreateChooseBranch(boxedCompleted, cts, scheduler, 0, source1, action1);
2082             branchTasks[1] = CreateChooseBranch(boxedCompleted, cts, scheduler, 1, source2, action2);
2083             if (hasThirdSource)
2084             {
2085                 branchTasks[2] = CreateChooseBranch(boxedCompleted, cts, scheduler, 2, source3, action3);
2086             }
2087
2088             // Asynchronously wait for all branches to complete, then complete
2089             // a task to be returned to the caller.
2090             var result = new TaskCompletionSource<int>();
2091             Task.Factory.ContinueWhenAll(branchTasks, tasks =>
2092             {
2093                 // Process the outcome of all branches.  At most one will have completed
2094                 // successfully, returning its branch ID.  Others may have faulted,
2095                 // in which case we need to propagate their exceptions, regardless
2096                 // of whether a branch completed successfully.  Others may have been
2097                 // canceled (or run but found they were not needed), and those
2098                 // we just ignore.
2099                 List<Exception> exceptions = null;
2100                 int successfulBranchId = -1;
2101                 foreach (Task<int> task in tasks)
2102                 {
2103                     switch (task.Status)
2104                     {
2105                         case TaskStatus.Faulted:
2106                             Common.AddException(ref exceptions, task.Exception, unwrapInnerExceptions: true);
2107                             break;
2108                         case TaskStatus.RanToCompletion:
2109                             int resultBranchId = task.Result;
2110                             if (resultBranchId >= 0)
2111                             {
2112                                 Debug.Assert(resultBranchId < tasks.Length, "Expected a valid branch ID");
2113                                 Debug.Assert(successfulBranchId == -1, "There should be at most one successful branch.");
2114                                 successfulBranchId = resultBranchId;
2115                             }
2116                             else Debug.Assert(resultBranchId == -1, "Expected -1 as a signal of a non-successful branch");
2117                             break;
2118                     }
2119                 }
2120
2121                 // If we found any exceptions, fault the Choose task.  Otherwise, if any branch completed
2122                 // successfully, store its result, or if cancellation was request
2123                 if (exceptions != null)
2124                 {
2125                     result.TrySetException(exceptions);
2126                 }
2127                 else if (successfulBranchId >= 0)
2128                 {
2129                     result.TrySetResult(successfulBranchId);
2130                 }
2131                 else
2132                 {
2133                     result.TrySetCanceled();
2134                 }
2135
2136                 // By now we know that all of the tasks have completed, so there
2137                 // can't be any more use of the CancellationTokenSource.
2138                 cts.Dispose();
2139             }, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
2140             return result.Task;
2141         }
2142
2143         /// <summary>Creates a target for a branch of a Choose.</summary>
2144         /// <typeparam name="T">Specifies the type of data coming through this branch.</typeparam>
2145         /// <param name="boxedCompleted">A strong box around the completed Task from any target. Also sync obj for access to the targets.</param>
2146         /// <param name="cts">The CancellationTokenSource used to issue tear down / cancellation requests.</param>
2147         /// <param name="scheduler">The TaskScheduler on which to scheduler work.</param>
2148         /// <param name="branchId">The ID of this branch, used to complete the resultTask.</param>
2149         /// <param name="source">The source with which this branch is associated.</param>
2150         /// <param name="action">The action to run for a single element received from the source.</param>
2151         /// <returns>A task representing the branch.</returns>
2152         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2153         private static Task<int> CreateChooseBranch<T>(
2154             StrongBox<Task> boxedCompleted, CancellationTokenSource cts,
2155             TaskScheduler scheduler,
2156             int branchId, ISourceBlock<T> source, Action<T> action)
2157         {
2158             // If the cancellation token is already canceled, there is no need to create and link a target.
2159             // Instead, directly return a canceled task.
2160             if (cts.IsCancellationRequested)
2161                 return Common.CreateTaskFromCancellation<int>(cts.Token);
2162
2163             // Proceed with creating and linking a hidden target. Also get the source's completion task, 
2164             // as we need it to know when the source completes.  Both of these operations
2165             // could throw an exception if the block is faulty.
2166             var target = new ChooseTarget<T>(boxedCompleted, cts.Token);
2167             IDisposable unlink;
2168             try
2169             {
2170                 unlink = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
2171             }
2172             catch (Exception exc)
2173             {
2174                 cts.Cancel();
2175                 return Common.CreateTaskFromException<int>(exc);
2176             }
2177
2178             // The continuation task below is implicitly capturing the right execution context,
2179             // as CreateChooseBranch is called synchronously from Choose, so we
2180             // don't need to additionally capture and marshal an ExecutionContext.
2181
2182             return target.Task.ContinueWith(completed =>
2183             {
2184                 try
2185                 {
2186                     // If the target ran to completion, i.e. it got a message, 
2187                     // cancel the other branch(es) and proceed with the user callback.
2188                     if (completed.Status == TaskStatus.RanToCompletion)
2189                     {
2190                         // Cancel the cts to trigger completion of the other branches.
2191                         cts.Cancel();
2192
2193                         // Proceed with the user callback.
2194                         action(completed.Result);
2195
2196                         // Return the ID of our branch to indicate.
2197                         return branchId;
2198                     }
2199                     return -1;
2200                 }
2201                 finally
2202                 {
2203                     // Unlink from the source.  This could throw if the block is faulty,
2204                     // in which case our branch's task will fault.  If this
2205                     // does throw, it'll end up propagating instead of the
2206                     // original action's exception if there was one.
2207                     unlink.Dispose();
2208                 }
2209             }, CancellationToken.None, Common.GetContinuationOptions(), scheduler);
2210         }
2211
2212         /// <summary>Provides a dataflow target used by Choose to receive data from a single source.</summary>
2213         /// <typeparam name="T">Specifies the type of data offered to this target.</typeparam>
2214         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2215         private sealed class ChooseTarget<T> : TaskCompletionSource<T>, ITargetBlock<T>, IDebuggerDisplay
2216         {
2217             /// <summary>
2218             /// Delegate used to invoke the action for a branch when that branch is activated
2219             /// on the fast path.
2220             /// </summary>
2221             internal static readonly Func<object, int> s_processBranchFunction = state =>
2222             {
2223                 Tuple<Action<T>, T, int> actionResultBranch = (Tuple<Action<T>, T, int>)state;
2224                 actionResultBranch.Item1(actionResultBranch.Item2);
2225                 return actionResultBranch.Item3;
2226             };
2227
2228             /// <summary>
2229             /// A wrapper for the task that represents the completed branch of this choice.
2230             /// The wrapper is also the sync object used to protect all choice branch's access to shared state.
2231             /// </summary>
2232             private StrongBox<Task> _completed;
2233
2234             /// <summary>Initializes the target.</summary>
2235             /// <param name="completed">The completed wrapper shared between all choice branches.</param>
2236             /// <param name="cancellationToken">The cancellation token used to cancel this target.</param>
2237             internal ChooseTarget(StrongBox<Task> completed, CancellationToken cancellationToken)
2238             {
2239                 Contract.Requires(completed != null, "Requires a shared target to complete.");
2240                 _completed = completed;
2241
2242                 // Handle async cancellation by canceling the target without storing it into _completed.
2243                 // _completed must only be set to a RanToCompletion task for a successful branch.
2244                 Common.WireCancellationToComplete(cancellationToken, base.Task,
2245                     state =>
2246                     {
2247                         var thisChooseTarget = (ChooseTarget<T>)state;
2248                         lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled();
2249                     }, this);
2250             }
2251
2252             /// <summary>Called when this choice branch is being offered a message.</summary>
2253             public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
2254             {
2255                 // Validate arguments
2256                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
2257                 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
2258                 Contract.EndContractBlock();
2259
2260                 lock (_completed)
2261                 {
2262                     // If we or another participating choice has already completed, we're done.
2263                     if (_completed.Value != null || base.Task.IsCompleted) return DataflowMessageStatus.DecliningPermanently;
2264
2265                     // Consume the message from the source if necessary
2266                     if (consumeToAccept)
2267                     {
2268                         bool consumed;
2269                         messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
2270                         if (!consumed) return DataflowMessageStatus.NotAvailable;
2271                     }
2272
2273                     // Store the result and signal our success
2274                     TrySetResult(messageValue);
2275                     _completed.Value = Task;
2276                     return DataflowMessageStatus.Accepted;
2277                 }
2278             }
2279
2280             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
2281             void IDataflowBlock.Complete()
2282             {
2283                 lock (_completed) TrySetCanceled();
2284             }
2285
2286             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
2287             void IDataflowBlock.Fault(Exception exception) { ((IDataflowBlock)this).Complete(); }
2288
2289             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
2290             Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
2291
2292             /// <summary>The data to display in the debugger display attribute.</summary>
2293             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2294             private object DebuggerDisplayContent
2295             {
2296                 get
2297                 {
2298                     return string.Format("{0} IsCompleted={1}",
2299                         Common.GetNameForDebugger(this), base.Task.IsCompleted);
2300                 }
2301             }
2302             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2303             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2304         }
2305         #endregion
2306         #endregion
2307
2308         #region AsObservable
2309         /// <summary>Creates a new <see cref="System.IObservable{TOutput}"/> abstraction over the <see cref="ISourceBlock{TOutput}"/>.</summary>
2310         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
2311         /// <param name="source">The source to wrap.</param>
2312         /// <returns>An IObservable{TOutput} that enables observers to be subscribed to the source.</returns>
2313         /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
2314         public static IObservable<TOutput> AsObservable<TOutput>(this ISourceBlock<TOutput> source)
2315         {
2316             if (source == null) throw new ArgumentNullException("source");
2317             Contract.EndContractBlock();
2318             return SourceObservable<TOutput>.From(source);
2319         }
2320
2321         /// <summary>Cached options for non-greedy processing.</summary>
2322         private static readonly ExecutionDataflowBlockOptions _nonGreedyExecutionOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
2323
2324         /// <summary>Provides an IObservable veneer over a source block.</summary>
2325         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2326         [DebuggerTypeProxy(typeof(SourceObservable<>.DebugView))]
2327         private sealed class SourceObservable<TOutput> : IObservable<TOutput>, IDebuggerDisplay
2328         {
2329             /// <summary>The table that maps source to cached observable.</summary>
2330             /// <remarks>
2331             /// ConditionalWeakTable doesn't do the initialization under a lock, just the publication.
2332             /// This means that if there's a race to create two observables off the same source, we could end
2333             /// up instantiating multiple SourceObservable instances, of which only one will be published.
2334             /// Worst case, we end up with a few additional continuations off of the source's completion task.
2335             /// </remarks>
2336             private static readonly ConditionalWeakTable<ISourceBlock<TOutput>, SourceObservable<TOutput>> _table =
2337                 new ConditionalWeakTable<ISourceBlock<TOutput>, SourceObservable<TOutput>>();
2338
2339             /// <summary>Gets an observable to represent the source block.</summary>
2340             /// <param name="source">The source.</param>
2341             /// <returns>The observable.</returns>
2342             internal static IObservable<TOutput> From(ISourceBlock<TOutput> source)
2343             {
2344                 Contract.Requires(source != null, "Requires a source for which to retrieve the observable.");
2345                 return _table.GetValue(source, s => new SourceObservable<TOutput>(s));
2346             }
2347
2348             /// <summary>Object used to synchronize all subscriptions, unsubscriptions, and propagations.</summary>
2349             private readonly object _SubscriptionLock = new object();
2350             /// <summary>The wrapped source.</summary>
2351             private readonly ISourceBlock<TOutput> _source;
2352             /// <summary>
2353             /// The current target.  We use the same target until the number of subscribers
2354             /// drops to 0, at which point we substitute in a new target.
2355             /// </summary>
2356             private ObserversState _observersState;
2357
2358             /// <summary>Initializes the SourceObservable.</summary>
2359             /// <param name="source">The source to wrap.</param>
2360             internal SourceObservable(ISourceBlock<TOutput> source)
2361             {
2362                 Contract.Requires(source != null, "The observable requires a source to wrap.");
2363                 _source = source;
2364                 _observersState = new ObserversState(this);
2365             }
2366
2367             /// <summary>Gets any exceptions from the source block.</summary>
2368             /// <returns>The aggregate exception of all errors, or null if everything completed successfully.</returns>
2369             private AggregateException GetCompletionError()
2370             {
2371                 Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(_source);
2372                 return sourceCompletionTask != null && sourceCompletionTask.IsFaulted ?
2373                     sourceCompletionTask.Exception : null;
2374             }
2375
2376             /// <summary>Subscribes the observer to the source.</summary>
2377             /// <param name="observer">the observer to subscribe.</param>
2378             /// <returns>An IDisposable that may be used to unsubscribe the source.</returns>
2379             [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
2380             IDisposable IObservable<TOutput>.Subscribe(IObserver<TOutput> observer)
2381             {
2382                 // Validate arguments
2383                 if (observer == null) throw new ArgumentNullException("observer");
2384                 Contract.EndContractBlock();
2385                 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: false);
2386
2387                 Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(_source);
2388
2389                 // Synchronize all observers for this source.
2390                 Exception error = null;
2391                 lock (_SubscriptionLock)
2392                 {
2393                     // Fast path for if everything is already done.  We need to ensure that both
2394                     // the source is complete and that the target has finished propagating data to all observers.
2395                     // If there  was an error, we grab it here and then we'll complete the observer
2396                     // outside of the lock.
2397                     if (sourceCompletionTask != null && sourceCompletionTask.IsCompleted &&
2398                         _observersState.Target.Completion.IsCompleted)
2399                     {
2400                         error = GetCompletionError();
2401                     }
2402                     // Otherwise, we need to subscribe this observer.
2403                     else
2404                     {
2405                         // Hook up the observer.  If this is the first observer, link the source to the target.
2406                         _observersState.Observers = _observersState.Observers.Add(observer);
2407                         if (_observersState.Observers.Count == 1)
2408                         {
2409                             Debug.Assert(_observersState.Unlinker == null, "The source should not be linked to the target.");
2410                             _observersState.Unlinker = _source.LinkTo(_observersState.Target);
2411                             if (_observersState.Unlinker == null)
2412                             {
2413                                 _observersState.Observers = ImmutableList<IObserver<TOutput>>.Empty;
2414                                 return null;
2415                             }
2416                         }
2417
2418                         // Return a disposable that will unlink this observer, and if it's the last
2419                         // observer for the source, shut off the pipe to observers.
2420                         return Disposables.Create((s, o) => s.Unsubscribe(o), this, observer);
2421                     }
2422                 }
2423
2424                 // Complete the observer.
2425                 if (error != null) observer.OnError(error);
2426                 else observer.OnCompleted();
2427                 return Disposables.Nop;
2428             }
2429
2430             /// <summary>Unsubscribes the observer.</summary>
2431             /// <param name="observer">The observer being unsubscribed.</param>
2432             private void Unsubscribe(IObserver<TOutput> observer)
2433             {
2434                 Contract.Requires(observer != null, "Expected an observer.");
2435                 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: false);
2436
2437                 lock (_SubscriptionLock)
2438                 {
2439                     ObserversState currentState = _observersState;
2440                     Debug.Assert(currentState != null, "Observer state should never be null.");
2441
2442                     // If the observer was already unsubscribed (or is otherwise no longer present in our list), bail.
2443                     if (!currentState.Observers.Contains(observer)) return;
2444
2445                     // If this is the last observer being removed, reset to be ready for future subscribers.
2446                     if (currentState.Observers.Count == 1)
2447                     {
2448                         ResetObserverState();
2449                     }
2450                     // Otherwise, just remove the observer.  Note that we don't remove the observer
2451                     // from the current target if this is the last observer. This is done in case the target
2452                     // has already taken data from the source: we want that data to end up somewhere,
2453                     // and we can't put it back in the source, so we ensure we send it along to the observer.
2454                     else
2455                     {
2456                         currentState.Observers = currentState.Observers.Remove(observer);
2457                     }
2458                 }
2459             }
2460
2461             /// <summary>Resets the observer state to the original, inactive state.</summary>
2462             /// <returns>The list of active observers prior to the reset.</returns>
2463             private ImmutableList<IObserver<TOutput>> ResetObserverState()
2464             {
2465                 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: true);
2466
2467                 ObserversState currentState = _observersState;
2468                 Debug.Assert(currentState != null, "Observer state should never be null.");
2469                 Debug.Assert(currentState.Unlinker != null, "The target should be linked.");
2470                 Debug.Assert(currentState.Canceler != null, "The target should have set up continuations.");
2471
2472                 // Replace the target with a clean one, unlink and cancel, and return the previous set of observers
2473                 ImmutableList<IObserver<TOutput>> currentObservers = currentState.Observers;
2474                 _observersState = new ObserversState(this);
2475                 currentState.Unlinker.Dispose();
2476                 currentState.Canceler.Cancel();
2477                 return currentObservers;
2478             }
2479
2480             /// <summary>The data to display in the debugger display attribute.</summary>
2481             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2482             private object DebuggerDisplayContent
2483             {
2484                 get
2485                 {
2486                     var displaySource = _source as IDebuggerDisplay;
2487                     return string.Format("Observers={0}, Block=\"{1}\"",
2488                         _observersState.Observers.Count,
2489                         displaySource != null ? displaySource.Content : _source);
2490                 }
2491             }
2492             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2493             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2494
2495             /// <summary>Provides a debugger type proxy for the observable.</summary>
2496             private sealed class DebugView
2497             {
2498                 /// <summary>The observable being debugged.</summary>
2499                 private readonly SourceObservable<TOutput> _observable;
2500
2501                 /// <summary>Initializes the debug view.</summary>
2502                 /// <param name="observable">The target being debugged.</param>
2503                 public DebugView(SourceObservable<TOutput> observable)
2504                 {
2505                     Contract.Requires(observable != null, "Need a block with which to construct the debug view.");
2506                     _observable = observable;
2507                 }
2508
2509                 /// <summary>Gets an enumerable of the observers.</summary>
2510                 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
2511                 public IObserver<TOutput>[] Observers { get { return _observable._observersState.Observers.ToArray(); } }
2512             }
2513
2514             /// <summary>State associated with the current target for propagating data to observers.</summary>
2515             [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
2516             private sealed class ObserversState
2517             {
2518                 /// <summary>The owning SourceObservable.</summary>
2519                 internal readonly SourceObservable<TOutput> Observable;
2520                 /// <summary>The ActionBlock that consumes data from a source and offers it to targets.</summary>
2521                 internal readonly ActionBlock<TOutput> Target;
2522                 /// <summary>Used to cancel continuations when they're no longer necessary.</summary>
2523                 internal readonly CancellationTokenSource Canceler = new CancellationTokenSource();
2524                 /// <summary>
2525                 /// A list of the observers currently registered with this target.  The list is immutable
2526                 /// to enable iteration through the list while the set of observers may be changing.
2527                 /// </summary>
2528                 internal ImmutableList<IObserver<TOutput>> Observers = ImmutableList<IObserver<TOutput>>.Empty;
2529                 /// <summary>Used to unlink the source from this target when the last observer is unsubscribed.</summary>
2530                 internal IDisposable Unlinker;
2531                 /// <summary>
2532                 /// Temporary list to keep track of SendAsync tasks to TargetObservers with back pressure.
2533                 /// This field gets instantiated on demand. It gets populated and cleared within an offering cycle.
2534                 /// </summary>
2535                 private List<Task<bool>> _tempSendAsyncTaskList;
2536
2537                 /// <summary>Initializes the target instance.</summary>
2538                 /// <param name="observable">The owning observable.</param>
2539                 internal ObserversState(SourceObservable<TOutput> observable)
2540                 {
2541                     Contract.Requires(observable != null, "Observe state must be mapped to a source observable.");
2542
2543                     // Set up the target block
2544                     Observable = observable;
2545                     Target = new ActionBlock<TOutput>((Func<TOutput, Task>)ProcessItemAsync, DataflowBlock._nonGreedyExecutionOptions);
2546
2547                     // If the target block fails due to an unexpected exception (e.g. it calls back to the source and the source throws an error), 
2548                     // we fault currently registered observers and reset the observable.
2549                     Target.Completion.ContinueWith(
2550                         (t, state) => ((ObserversState)state).NotifyObserversOfCompletion(t.Exception), this,
2551                         CancellationToken.None,
2552                         Common.GetContinuationOptions(TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
2553                         TaskScheduler.Default);
2554
2555                     // When the source completes, complete the target. Then when the target completes, 
2556                     // send completion messages to any observers still registered.
2557                     Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(Observable._source);
2558                     if (sourceCompletionTask != null)
2559                     {
2560                         sourceCompletionTask.ContinueWith((_1, state1) =>
2561                         {
2562                             var ti = (ObserversState)state1;
2563                             ti.Target.Complete();
2564                             ti.Target.Completion.ContinueWith(
2565                                 (_2, state2) => ((ObserversState)state2).NotifyObserversOfCompletion(), state1,
2566                                 CancellationToken.None,
2567                                 Common.GetContinuationOptions(TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
2568                                 TaskScheduler.Default);
2569                         }, this, Canceler.Token, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
2570                     }
2571                 }
2572
2573                 /// <summary>Forwards an item to all currently subscribed observers.</summary>
2574                 /// <param name="item">The item to forward.</param>
2575                 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2576                 private Task ProcessItemAsync(TOutput item)
2577                 {
2578                     Common.ContractAssertMonitorStatus(Observable._SubscriptionLock, held: false);
2579
2580                     ImmutableList<IObserver<TOutput>> currentObservers;
2581                     lock (Observable._SubscriptionLock) currentObservers = Observers;
2582                     try
2583                     {
2584                         foreach (IObserver<TOutput> observer in currentObservers)
2585                         {
2586                             // If the observer is our own TargetObserver, we SendAsync() to it
2587                             // rather than going through IObserver.OnNext() which allows us to
2588                             // continue offering to the remaining observers without blocking.
2589                             var targetObserver = observer as TargetObserver<TOutput>;
2590                             if (targetObserver != null)
2591                             {
2592                                 Task<bool> sendAsyncTask = targetObserver.SendAsyncToTarget(item);
2593                                 if (sendAsyncTask.Status != TaskStatus.RanToCompletion)
2594                                 {
2595                                     // Ensure the SendAsyncTaskList is instantiated
2596                                     if (_tempSendAsyncTaskList == null) _tempSendAsyncTaskList = new List<Task<bool>>();
2597
2598                                     // Add the task to the list
2599                                     _tempSendAsyncTaskList.Add(sendAsyncTask);
2600                                 }
2601                             }
2602                             else
2603                             {
2604                                 observer.OnNext(item);
2605                             }
2606                         }
2607
2608                         // If there are SendAsync tasks to wait on...
2609                         if (_tempSendAsyncTaskList != null && _tempSendAsyncTaskList.Count > 0)
2610                         {
2611                             // Consolidate all SendAsync tasks into one
2612                             Task<bool[]> allSendAsyncTasksConsolidated = Task.WhenAll(_tempSendAsyncTaskList);
2613
2614                             // Clear the temp SendAsync task list
2615                             _tempSendAsyncTaskList.Clear();
2616
2617                             // Return the consolidated task
2618                             return allSendAsyncTasksConsolidated;
2619                         }
2620                     }
2621                     catch (Exception exc)
2622                     {
2623                         // Return a faulted task
2624                         return Common.CreateTaskFromException<VoidResult>(exc);
2625                     }
2626
2627                     // All observers accepted normally. 
2628                     // Return a completed task.
2629                     return Common.CompletedTaskWithTrueResult;
2630                 }
2631
2632                 /// <summary>Notifies all currently registered observers that they should complete.</summary>
2633                 /// <param name="targetException">
2634                 /// Non-null when an unexpected exception occurs during processing.  Faults
2635                 /// all subscribed observers and resets the observable back to its original condition.
2636                 /// </param>
2637                 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2638                 private void NotifyObserversOfCompletion(Exception targetException = null)
2639                 {
2640                     Contract.Requires(Target.Completion.IsCompleted, "The target must have already completed in order to notify of completion.");
2641                     Common.ContractAssertMonitorStatus(Observable._SubscriptionLock, held: false);
2642
2643                     // Send completion notification to all observers.
2644                     ImmutableList<IObserver<TOutput>> currentObservers;
2645                     lock (Observable._SubscriptionLock)
2646                     {
2647                         // Get the currently registered set of observers. Then, if we're being called due to the target 
2648                         // block failing from an unexpected exception, reset the observer state so that subsequent 
2649                         // subscribed observers will get a new target block.  Finally clear out our observer list.
2650                         currentObservers = Observers;
2651                         if (targetException != null) Observable.ResetObserverState();
2652                         Observers = ImmutableList<IObserver<TOutput>>.Empty;
2653                     }
2654
2655                     // If there are any observers to complete...
2656                     if (currentObservers.Count > 0)
2657                     {
2658                         // Determine if we should fault or complete the observers
2659                         Exception error = targetException ?? Observable.GetCompletionError();
2660                         try
2661                         {
2662                             // Do it.
2663                             if (error != null)
2664                             {
2665                                 foreach (IObserver<TOutput> observer in currentObservers) observer.OnError(error);
2666                             }
2667                             else
2668                             {
2669                                 foreach (IObserver<TOutput> observer in currentObservers) observer.OnCompleted();
2670                             }
2671                         }
2672                         catch (Exception exc)
2673                         {
2674                             // If an observer throws an exception at this point (which it shouldn't do),
2675                             // we have little recourse but to let that exception propagate.  Since allowing it to
2676                             // propagate here would just result in it getting eaten by the owning task,
2677                             // we instead have it propagate on the thread pool.
2678                             Common.ThrowAsync(exc);
2679                         }
2680                     }
2681                 }
2682             }
2683         }
2684         #endregion
2685
2686         #region AsObserver
2687         /// <summary>Creates a new <see cref="System.IObserver{TInput}"/> abstraction over the <see cref="ITargetBlock{TInput}"/>.</summary>
2688         /// <typeparam name="TInput">Specifies the type of input accepted by the target block.</typeparam>
2689         /// <param name="target">The target to wrap.</param>
2690         /// <returns>An observer that wraps the target block.</returns>
2691         public static IObserver<TInput> AsObserver<TInput>(this ITargetBlock<TInput> target)
2692         {
2693             if (target == null) throw new ArgumentNullException("target");
2694             Contract.EndContractBlock();
2695             return new TargetObserver<TInput>(target);
2696         }
2697
2698         /// <summary>Provides an observer wrapper for a target block.</summary>
2699         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2700         private sealed class TargetObserver<TInput> : IObserver<TInput>, IDebuggerDisplay
2701         {
2702             /// <summary>The wrapped target.</summary>
2703             private readonly ITargetBlock<TInput> _target;
2704
2705             /// <summary>Initializes the observer.</summary>
2706             /// <param name="target">The target to wrap.</param>
2707             internal TargetObserver(ITargetBlock<TInput> target)
2708             {
2709                 Contract.Requires(target != null, "A target to observe is required.");
2710                 _target = target;
2711             }
2712
2713             /// <summary>Sends the value to the observer.</summary>
2714             /// <param name="value">The value to send.</param>
2715             void IObserver<TInput>.OnNext(TInput value)
2716             {
2717                 // Send the value asynchronously...
2718                 Task<bool> task = SendAsyncToTarget(value);
2719
2720                 // And block until it's received.
2721                 task.GetAwaiter().GetResult(); // propagate original (non-aggregated) exception
2722             }
2723
2724             /// <summary>Completes the target.</summary>
2725             void IObserver<TInput>.OnCompleted()
2726             {
2727                 _target.Complete();
2728             }
2729
2730             /// <summary>Forwards the error to the target.</summary>
2731             /// <param name="error">The exception to forward.</param>
2732             void IObserver<TInput>.OnError(Exception error)
2733             {
2734                 _target.Fault(error);
2735             }
2736
2737             /// <summary>Sends a value to the underlying target asynchronously.</summary>
2738             /// <param name="value">The value to send.</param>
2739             /// <returns>A Task{bool} to wait on.</returns>
2740             internal Task<bool> SendAsyncToTarget(TInput value)
2741             {
2742                 return _target.SendAsync(value);
2743             }
2744
2745             /// <summary>The data to display in the debugger display attribute.</summary>
2746             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2747             private object DebuggerDisplayContent
2748             {
2749                 get
2750                 {
2751                     var displayTarget = _target as IDebuggerDisplay;
2752                     return string.Format("Block=\"{0}\"",
2753                         displayTarget != null ? displayTarget.Content : _target);
2754                 }
2755             }
2756             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2757             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2758         }
2759         #endregion
2760
2761         #region NullTarget
2762         /// <summary>
2763         /// Gets a target block that synchronously accepts all messages offered to it and drops them.
2764         /// </summary>
2765         /// <typeparam name="TInput">The type of the messages this block can accept.</typeparam>
2766         /// <returns>A <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/> that accepts and subsequently drops all offered messages.</returns>
2767         public static ITargetBlock<TInput> NullTarget<TInput>()
2768         {
2769             return new NullTargetBlock<TInput>();
2770         }
2771
2772         /// <summary>
2773         /// Target block that synchronously accepts all messages offered to it and drops them.
2774         /// </summary>
2775         /// <typeparam name="TInput">The type of the messages this block can accept.</typeparam>
2776         private class NullTargetBlock<TInput> : ITargetBlock<TInput>
2777         {
2778             private Task _completion;
2779
2780             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
2781             DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
2782             {
2783                 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
2784                 Contract.EndContractBlock();
2785
2786                 // If the source requires an explicit synchronous consumption, do it
2787                 if (consumeToAccept)
2788                 {
2789                     if (source == null) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
2790                     bool messageConsumed;
2791
2792                     // If the source throws during this call, let the exception propagate back to the source
2793                     source.ConsumeMessage(messageHeader, this, out messageConsumed);
2794                     if (!messageConsumed) return DataflowMessageStatus.NotAvailable;
2795                 }
2796
2797                 // Always tell the source the message has been accepted
2798                 return DataflowMessageStatus.Accepted;
2799             }
2800
2801             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
2802             void IDataflowBlock.Complete() { } // No-op
2803             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
2804             void IDataflowBlock.Fault(Exception exception) { } // No-op
2805             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
2806             Task IDataflowBlock.Completion
2807             {
2808                 get { return LazyInitializer.EnsureInitialized(ref _completion, () => new TaskCompletionSource<VoidResult>().Task); }
2809             }
2810         }
2811         #endregion
2812     }
2813 }