1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // Common functionality for ITargetBlock, ISourceBlock, and IPropagatorBlock.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
18 using System.Runtime.CompilerServices;
19 using System.Runtime.ExceptionServices;
20 using System.Security;
21 using System.Threading.Tasks.Dataflow.Internal;
22 using System.Threading.Tasks.Dataflow.Internal.Threading;
24 namespace System.Threading.Tasks.Dataflow
27 /// Provides a set of static (Shared in Visual Basic) methods for working with dataflow blocks.
29 public static class DataflowBlock
32 /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/>.</summary>
33 /// <param name="source">The source from which to link.</param>
34 /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
35 /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
36 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
37 /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
38 public static IDisposable LinkTo<TOutput>(
39 this ISourceBlock<TOutput> source,
40 ITargetBlock<TOutput> target)
43 if (source == null) throw new ArgumentNullException("source");
44 if (target == null) throw new ArgumentNullException("target");
45 Contract.EndContractBlock();
47 // This method exists purely to pass default DataflowLinkOptions
48 // to increase usability of the "90%" case.
49 return source.LinkTo(target, DataflowLinkOptions.Default);
52 /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/> using the specified filter.</summary>
53 /// <param name="source">The source from which to link.</param>
54 /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
55 /// <param name="predicate">The filter a message must pass in order for it to propagate from the source to the target.</param>
56 /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
57 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
58 /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
59 /// <exception cref="System.ArgumentNullException">The <paramref name="predicate"/> is null (Nothing in Visual Basic).</exception>
60 public static IDisposable LinkTo<TOutput>(
61 this ISourceBlock<TOutput> source,
62 ITargetBlock<TOutput> target,
63 Predicate<TOutput> predicate)
65 // All argument validation handled by delegated method.
66 return LinkTo(source, target, DataflowLinkOptions.Default, predicate);
69 /// <summary>Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TOutput}"/> using the specified filter.</summary>
70 /// <param name="source">The source from which to link.</param>
71 /// <param name="target">The <see cref="ITargetBlock{TOutput}"/> to which to connect the source.</param>
72 /// <param name="predicate">The filter a message must pass in order for it to propagate from the source to the target.</param>
73 /// <param name="linkOptions">The options to use to configure the link.</param>
74 /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target.</returns>
75 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
76 /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
77 /// <exception cref="System.ArgumentNullException">The <paramref name="linkOptions"/> is null (Nothing in Visual Basic).</exception>
78 /// <exception cref="System.ArgumentNullException">The <paramref name="predicate"/> is null (Nothing in Visual Basic).</exception>
79 public static IDisposable LinkTo<TOutput>(
80 this ISourceBlock<TOutput> source,
81 ITargetBlock<TOutput> target,
82 DataflowLinkOptions linkOptions,
83 Predicate<TOutput> predicate)
86 if (source == null) throw new ArgumentNullException("source");
87 if (target == null) throw new ArgumentNullException("target");
88 if (linkOptions == null) throw new ArgumentNullException("linkOptions");
89 if (predicate == null) throw new ArgumentNullException("predicate");
90 Contract.EndContractBlock();
92 // Create the filter, which links to the real target, and then
93 // link the real source to this intermediate filter.
94 var filter = new FilteredLinkPropagator<TOutput>(source, target, predicate);
95 return source.LinkTo(filter, linkOptions);
98 /// <summary>Provides a synchronous filter for use in filtered LinkTos.</summary>
99 /// <typeparam name="T">Specifies the type of data being filtered.</typeparam>
100 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
101 [DebuggerTypeProxy(typeof(FilteredLinkPropagator<>.DebugView))]
102 private sealed class FilteredLinkPropagator<T> : IPropagatorBlock<T, T>, IDebuggerDisplay
104 /// <summary>The source connected with this filter.</summary>
105 private readonly ISourceBlock<T> _source;
106 /// <summary>The target with which this block is associated.</summary>
107 private readonly ITargetBlock<T> _target;
108 /// <summary>The predicate provided by the user.</summary>
109 private readonly Predicate<T> _userProvidedPredicate;
111 /// <summary>Initializes the filter passthrough.</summary>
112 /// <param name="source">The source connected to this filter.</param>
113 /// <param name="target">The target to which filtered messages should be passed.</param>
114 /// <param name="predicate">The predicate to run for each messsage.</param>
115 internal FilteredLinkPropagator(ISourceBlock<T> source, ITargetBlock<T> target, Predicate<T> predicate)
117 Contract.Requires(source != null, "Filtered link requires a source to filter on.");
118 Contract.Requires(target != null, "Filtered link requires a target to filter to.");
119 Contract.Requires(predicate != null, "Filtered link requires a predicate to filter with.");
121 // Store the arguments
124 _userProvidedPredicate = predicate;
127 /// <summary>Runs the user-provided predicate over an item in the correct execution context.</summary>
128 /// <param name="item">The item to evaluate.</param>
129 /// <returns>true if the item passed the filter; otherwise, false.</returns>
130 private bool RunPredicate(T item)
132 Contract.Requires(_userProvidedPredicate != null, "User-provided predicate is required.");
134 return _userProvidedPredicate(item); // avoid state object allocation if execution context isn't needed
137 /// <summary>Manually closes over state necessary in FilteredLinkPropagator.</summary>
138 private sealed class PredicateContextState
140 /// <summary>The input to be filtered.</summary>
141 internal readonly T Input;
142 /// <summary>The predicate function.</summary>
143 internal readonly Predicate<T> Predicate;
144 /// <summary>The result of the filtering operation.</summary>
145 internal bool Output;
147 /// <summary>Initializes the predicate state.</summary>
148 /// <param name="input">The input to be filtered.</param>
149 /// <param name="predicate">The predicate function.</param>
150 internal PredicateContextState(T input, Predicate<T> predicate)
152 Contract.Requires(predicate != null, "A predicate with which to filter is required.");
154 this.Predicate = predicate;
157 /// <summary>Runs the predicate function over the input and stores the result into the output.</summary>
160 Contract.Requires(Predicate != null, "Non-null predicate required");
161 Output = Predicate(Input);
165 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
166 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
168 // Validate arguments. Some targets may have a null source, but FilteredLinkPropagator
169 // is an internal target that should only ever have source non-null.
170 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
171 if (source == null) throw new ArgumentNullException("source");
172 Contract.EndContractBlock();
175 bool passedFilter = RunPredicate(messageValue);
177 // If the predicate matched, pass the message along to the real target.
180 return _target.OfferMessage(messageHeader, messageValue, this, consumeToAccept);
182 // Otherwise, decline.
183 else return DataflowMessageStatus.Declined;
186 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
187 T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
189 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
190 // The real source will also be doing verifications, so we don't need to validate args here.
191 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
192 return _source.ConsumeMessage(messageHeader, this, out messageConsumed);
195 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
196 bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
198 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
199 // The real source will also be doing verifications, so we don't need to validate args here.
200 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
201 return _source.ReserveMessage(messageHeader, this);
204 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
205 void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
207 // This message should have only made it to the target if it passes the filter, so we shouldn't need to check again.
208 // The real source will also be doing verifications, so we don't need to validate args here.
209 Debug.Assert(messageHeader.IsValid, "Only valid messages may be consumed.");
210 _source.ReleaseReservation(messageHeader, this);
213 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
214 Task IDataflowBlock.Completion { get { return _source.Completion; } }
215 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
216 void IDataflowBlock.Complete() { _target.Complete(); }
217 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
218 void IDataflowBlock.Fault(Exception exception) { _target.Fault(exception); }
220 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
221 IDisposable ISourceBlock<T>.LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
223 /// <summary>The data to display in the debugger display attribute.</summary>
224 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
225 private object DebuggerDisplayContent
229 var displaySource = _source as IDebuggerDisplay;
230 var displayTarget = _target as IDebuggerDisplay;
231 return string.Format("{0} Source=\"{1}\", Target=\"{2}\"",
232 Common.GetNameForDebugger(this),
233 displaySource != null ? displaySource.Content : _source,
234 displayTarget != null ? displayTarget.Content : _target);
237 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
238 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
240 /// <summary>Provides a debugger type proxy for a filter.</summary>
241 private sealed class DebugView
243 /// <summary>The filter.</summary>
244 private readonly FilteredLinkPropagator<T> _filter;
246 /// <summary>Initializes the debug view.</summary>
247 /// <param name="filter">The filter to view.</param>
248 public DebugView(FilteredLinkPropagator<T> filter)
250 Contract.Requires(filter != null, "Need a filter with which to construct the debug view.");
254 /// <summary>The linked target for this filter.</summary>
255 public ITargetBlock<T> LinkedTarget { get { return _filter._target; } }
260 #region Post and SendAsync
261 /// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
262 /// <typeparam name="TInput">Specifies the type of data accepted by the target block.</typeparam>
263 /// <param name="target">The target block.</param>
264 /// <param name="item">The item being offered to the target.</param>
265 /// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
267 /// This method will return once the target block has decided to accept or decline the item,
268 /// but unless otherwise dictated by special semantics of the target block, it does not wait
269 /// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
270 /// will return from Post as soon as it has stored the posted item into its input queue). From the perspective
271 /// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages,
272 /// or for blocks that may do more processing in their Post implementation, consider using
273 /// <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>,
274 /// which will return immediately and will enable the target to postpone the posted message and later consume it
275 /// after SendAsync returns.
277 public static Boolean Post<TInput>(this ITargetBlock<TInput> target, TInput item)
279 if (target == null) throw new ArgumentNullException("target");
280 return target.OfferMessage(Common.SingleMessageHeader, item, source: null, consumeToAccept: false) == DataflowMessageStatus.Accepted;
283 /// <summary>Asynchronously offers a message to the target message block, allowing for postponement.</summary>
284 /// <typeparam name="TInput">Specifies the type of the data to post to the target.</typeparam>
285 /// <param name="target">The target to which to post the data.</param>
286 /// <param name="item">The item being offered to the target.</param>
288 /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that represents the asynchronous send. If the target
289 /// accepts and consumes the offered element during the call to SendAsync, upon return
290 /// from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see>
291 /// property will return true. If the target declines the offered element during the call, upon return from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will
292 /// be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> property will return false. If the target
293 /// postpones the offered element, the element will be buffered until such time that the target consumes or releases it, at which
294 /// point the Task will complete, with its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> indicating whether the message was consumed. If the target
295 /// never attempts to consume or release the message, the returned task will never complete.
297 /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
298 public static Task<Boolean> SendAsync<TInput>(this ITargetBlock<TInput> target, TInput item)
300 return SendAsync<TInput>(target, item, CancellationToken.None);
303 /// <summary>Asynchronously offers a message to the target message block, allowing for postponement.</summary>
304 /// <typeparam name="TInput">Specifies the type of the data to post to the target.</typeparam>
305 /// <param name="target">The target to which to post the data.</param>
306 /// <param name="item">The item being offered to the target.</param>
307 /// <param name="cancellationToken">The cancellation token with which to request cancellation of the send operation.</param>
310 /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that represents the asynchronous send. If the target
311 /// accepts and consumes the offered element during the call to SendAsync, upon return
312 /// from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see>
313 /// property will return true. If the target declines the offered element during the call, upon return from the call the resulting <see cref="System.Threading.Tasks.Task{Boolean}"/> will
314 /// be completed and its <see cref="System.Threading.Tasks.Task{Boolean}.Result">Result</see> property will return false. If the target
315 /// postpones the offered element, the element will be buffered until such time that the target consumes or releases it, at which
316 /// point the Task will complete, with its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> indicating whether the message was consumed. If the target
317 /// never attempts to consume or release the message, the returned task will never complete.
320 /// If cancellation is requested before the target has successfully consumed the sent data,
321 /// the returned task will complete in the Canceled state and the data will no longer be available to the target.
324 /// <exception cref="System.ArgumentNullException">The <paramref name="target"/> is null (Nothing in Visual Basic).</exception>
325 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
326 public static Task<Boolean> SendAsync<TInput>(this ITargetBlock<TInput> target, TInput item, CancellationToken cancellationToken)
328 // Validate arguments. No validation necessary for item.
329 if (target == null) throw new ArgumentNullException("target");
330 Contract.EndContractBlock();
332 // Fast path check for cancellation
333 if (cancellationToken.IsCancellationRequested)
334 return Common.CreateTaskFromCancellation<Boolean>(cancellationToken);
336 SendAsyncSource<TInput> source;
338 // Fast path: try to offer the item synchronously. This first try is done
339 // without any form of cancellation, and thus consumeToAccept can be the better-performing "false".
342 switch (target.OfferMessage(Common.SingleMessageHeader, item, source: null, consumeToAccept: false))
344 // If the message is immediately accepted, return a cached completed task with a true result
345 case DataflowMessageStatus.Accepted:
346 return Common.CompletedTaskWithTrueResult;
348 // If the target is declining permanently, return a cached completed task with a false result
349 case DataflowMessageStatus.DecliningPermanently:
350 return Common.CompletedTaskWithFalseResult;
353 case DataflowMessageStatus.Postponed:
354 Debug.Assert(false, "A message should never be postponed when no source has been provided");
357 case DataflowMessageStatus.NotAvailable:
358 Debug.Assert(false, "The message should never be missed, as it's offered to only this one target");
363 // Slow path: the target did not accept the synchronous post, nor did it decline it.
364 // Create a source for the send, launch the offering, and return the representative task.
365 // This ctor attempts to register a cancellation notification which would throw if the
366 // underlying CTS has been disposed of. Therefore, keep it inside the try/catch block.
367 source = new SendAsyncSource<TInput>(target, item, cancellationToken);
369 catch (Exception exc)
371 // If the target throws from OfferMessage, return a faulted task
372 Common.StoreDataflowMessageValueIntoExceptionData(exc, item);
373 return Common.CreateTaskFromException<Boolean>(exc);
376 Debug.Assert(source != null, "The SendAsyncSource instance must have been constructed.");
377 source.OfferToTarget(); // synchronous to preserve message ordering
382 /// Provides a source used by SendAsync that will buffer a single message and signal when it's been accepted or declined.
384 /// <remarks>This source must only be passed to a single target, and must only be used once.</remarks>
385 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
386 [DebuggerTypeProxy(typeof(SendAsyncSource<>.DebugView))]
387 private sealed class SendAsyncSource<TOutput> : TaskCompletionSource<Boolean>, ISourceBlock<TOutput>, IDebuggerDisplay
389 /// <summary>The target to offer to.</summary>
390 private readonly ITargetBlock<TOutput> _target;
391 /// <summary>The buffered message.</summary>
392 private readonly TOutput _messageValue;
394 /// <summary>CancellationToken used to cancel the send.</summary>
395 private CancellationToken _cancellationToken;
396 /// <summary>Registration with the cancellation token.</summary>
397 private CancellationTokenRegistration _cancellationRegistration;
398 /// <summary>The cancellation/completion state of the source.</summary>
399 private int _cancellationState; // one of the CANCELLATION_STATE_* constant values, defaulting to NONE
401 // Cancellation states:
402 // _cancellationState starts out as NONE, and will remain that way unless a CancellationToken
403 // is provided in the initial OfferToTarget call. As such, unless a token is provided,
404 // all synchronization related to cancellation will be avoided. Once a token is provided,
405 // the state transitions to REGISTERED. If cancellation then is requested or if the target
406 // calls back to consume the message, the state will transition to COMPLETING prior to
407 // actually committing the action; if it can't transition to COMPLETING, then the action doesn't
408 // take effect (e.g. if cancellation raced with the target consuming, such that the cancellation
409 // action was able to transition to COMPLETING but the consumption wasn't, then ConsumeMessage
410 // would return false indicating that the message could not be consumed). The only additional
411 // complication here is around reservations. If a target reserves a message, _cancellationState
412 // transitions to RESERVED. A subsequent ConsumeMessage call can successfully transition from
413 // RESERVED to COMPLETING, but cancellation can't; cancellation can only transition from REGISTERED
414 // to COMPLETING. If the reservation on the message is instead released, _cancellationState
415 // will transition back to REGISTERED.
417 /// <summary>No cancellation registration is used.</summary>
418 private const int CANCELLATION_STATE_NONE = 0;
419 /// <summary>A cancellation token has been registered.</summary>
420 private const int CANCELLATION_STATE_REGISTERED = 1;
421 /// <summary>The message has been reserved. Only used if a cancellation token is in play.</summary>
422 private const int CANCELLATION_STATE_RESERVED = 2;
423 /// <summary>Completion is now in progress. Only used if a cancellation token is in play.</summary>
424 private const int CANCELLATION_STATE_COMPLETING = 3;
426 /// <summary>Initializes the source.</summary>
427 /// <param name="target">The target to offer to.</param>
428 /// <param name="messageValue">The message to offer and buffer.</param>
429 /// <param name="cancellationToken">The cancellation token with which to cancel the send.</param>
430 internal SendAsyncSource(ITargetBlock<TOutput> target, TOutput messageValue, CancellationToken cancellationToken)
432 Contract.Requires(target != null, "A valid target to send to is required.");
434 _messageValue = messageValue;
436 // If a cancelable CancellationToken is used, update our cancellation state
437 // and register with the token. Only if CanBeCanceled is true due we want
438 // to pay the subsequent costs around synchronization between cancellation
439 // requests and the target coming back to consume the message.
440 if (cancellationToken.CanBeCanceled)
442 _cancellationToken = cancellationToken;
443 _cancellationState = CANCELLATION_STATE_REGISTERED;
447 _cancellationRegistration = cancellationToken.Register(
448 _cancellationCallback, new WeakReference<SendAsyncSource<TOutput>>(this));
452 // Suppress finalization. Finalization is only required if the target drops a reference
453 // to the source before the source has completed, and we'll never offer to the target.
454 GC.SuppressFinalize(this);
456 // Propagate the exception
462 /// <summary>Finalizer that completes the returned task if all references to this source are dropped.</summary>
465 // CompleteAsDeclined uses synchronization, which is dangerous for a finalizer
466 // during shutdown or appdomain unload.
467 if (!Environment.HasShutdownStarted)
469 CompleteAsDeclined(runAsync: true);
473 /// <summary>Completes the source in an "Accepted" state.</summary>
474 /// <param name="runAsync">true to accept asynchronously; false to accept synchronously.</param>
475 private void CompleteAsAccepted(bool runAsync)
477 RunCompletionAction(state =>
479 try { ((SendAsyncSource<TOutput>)state).TrySetResult(true); }
480 catch (ObjectDisposedException) { }
484 /// <summary>Completes the source in an "Declined" state.</summary>
485 /// <param name="runAsync">true to decline asynchronously; false to decline synchronously.</param>
486 private void CompleteAsDeclined(bool runAsync)
488 RunCompletionAction(state =>
490 // The try/catch for ObjectDisposedException handles the case where the
491 // user disposes of the returned task before we're done with it.
492 try { ((SendAsyncSource<TOutput>)state).TrySetResult(false); }
493 catch (ObjectDisposedException) { }
497 /// <summary>Completes the source in faulted state.</summary>
498 /// <param name="exception">The exception with which to fault.</param>
499 /// <param name="runAsync">true to fault asynchronously; false to fault synchronously.</param>
500 private void CompleteAsFaulted(Exception exception, bool runAsync)
502 RunCompletionAction(state =>
504 var tuple = (Tuple<SendAsyncSource<TOutput>, Exception>)state;
505 try { tuple.Item1.TrySetException(tuple.Item2); }
506 catch (ObjectDisposedException) { }
507 }, Tuple.Create<SendAsyncSource<TOutput>, Exception>(this, exception), runAsync);
510 /// <summary>Completes the source in canceled state.</summary>
511 /// <param name="runAsync">true to fault asynchronously; false to fault synchronously.</param>
512 private void CompleteAsCanceled(bool runAsync)
514 RunCompletionAction(state =>
516 try { ((SendAsyncSource<TOutput>)state).TrySetCanceled(); }
517 catch (ObjectDisposedException) { }
521 /// <summary>Executes a completion action.</summary>
522 /// <param name="completionAction">The action to execute, passed the state.</param>
523 /// <param name="completionActionState">The state to pass into the delegate.</param>
524 /// <param name="runAsync">true to execute the action asynchronously; false to execute it synchronously.</param>
526 /// async should be true if this is being called on a path that has the target on the stack, e.g.
527 /// the target is calling to ConsumeMessage. We don't want to block the target indefinitely
528 /// with any synchronous continuations off of the returned send async task.
530 [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly")]
531 private void RunCompletionAction(Action<object> completionAction, object completionActionState, bool runAsync)
533 Contract.Requires(completionAction != null, "Completion action to run is required.");
535 // Suppress finalization. Finalization is only required if the target drops a reference
536 // to the source before the source has completed, and here we're completing the source.
537 GC.SuppressFinalize(this);
539 // Dispose of the cancellation registration if there is one
540 if (_cancellationState != CANCELLATION_STATE_NONE)
542 Debug.Assert(_cancellationRegistration != default(CancellationTokenRegistration),
543 "If we're not in NONE, we must have a cancellation token we've registered with.");
544 _cancellationRegistration.Dispose();
547 // If we're meant to run asynchronously, launch a task.
550 System.Threading.Tasks.Task.Factory.StartNew(
551 completionAction, completionActionState,
552 CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
554 // Otherwise, execute directly.
557 completionAction(completionActionState);
561 /// <summary>Offers the message to the target asynchronously.</summary>
562 private void OfferToTargetAsync()
564 System.Threading.Tasks.Task.Factory.StartNew(
565 state => ((SendAsyncSource<TOutput>)state).OfferToTarget(), this,
566 CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
569 /// <summary>Cached delegate used to cancel a send in response to a cancellation request.</summary>
570 private readonly static Action<object> _cancellationCallback = CancellationHandler;
572 /// <summary>Attempts to cancel the source passed as state in response to a cancellation request.</summary>
573 /// <param name="state">
574 /// A weak reference to the SendAsyncSource. A weak reference is used to prevent the source
575 /// from being rooted in a long-lived token.
577 private static void CancellationHandler(object state)
579 SendAsyncSource<TOutput> source = Common.UnwrapWeakReference<SendAsyncSource<TOutput>>(state);
582 Debug.Assert(source._cancellationState != CANCELLATION_STATE_NONE,
583 "If cancellation is in play, we must have already moved out of the NONE state.");
585 // Try to reserve completion, and if we can, complete as canceled. Note that we can only
586 // achieve cancellation when in the REGISTERED state, and not when in the RESERVED state,
587 // as if a target has reserved the message, we must allow the message to be consumed successfully.
588 if (source._cancellationState == CANCELLATION_STATE_REGISTERED && // fast check to avoid the interlocked if we can
589 Interlocked.CompareExchange(ref source._cancellationState, CANCELLATION_STATE_COMPLETING, CANCELLATION_STATE_REGISTERED) == CANCELLATION_STATE_REGISTERED)
591 // We've reserved completion, so proceed to cancel the task.
592 source.CompleteAsCanceled(true);
597 /// <summary>Offers the message to the target synchronously.</summary>
598 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
599 internal void OfferToTarget()
603 // Offer the message to the target. If there's no cancellation in play, we can just allow the target
604 // to accept the message directly. But if a CancellationToken is in use, the target needs to come
605 // back to us to get the data; that way, we can ensure we don't race between returning a canceled task but
606 // successfully completing the send.
607 bool consumeToAccept = _cancellationState != CANCELLATION_STATE_NONE;
609 switch (_target.OfferMessage(
610 Common.SingleMessageHeader, _messageValue, this, consumeToAccept: consumeToAccept))
612 // If the message is immediately accepted, complete the task as accepted
613 case DataflowMessageStatus.Accepted:
614 if (!consumeToAccept)
616 // Cancellation wasn't in use, and the target accepted the message directly,
617 // so complete the task as accepted.
618 CompleteAsAccepted(runAsync: false);
622 // If cancellation is in use, then since the target accepted,
623 // our state better reflect that we're completing.
624 Debug.Assert(_cancellationState == CANCELLATION_STATE_COMPLETING,
625 "The message was accepted, so we should have started completion.");
629 // If the message is immediately declined, complete the task as declined
630 case DataflowMessageStatus.Declined:
631 case DataflowMessageStatus.DecliningPermanently:
632 CompleteAsDeclined(runAsync: false);
635 case DataflowMessageStatus.NotAvailable:
636 Debug.Assert(false, "The message should never be missed, as it's offered to only this one target");
638 // If the message was postponed, the source may or may not be complete yet. Nothing to validate.
639 // Treat an improper DataflowMessageStatus as postponed and do nothing.
643 // A faulty target might throw from OfferMessage. If that happens,
644 // we'll try to fault the returned task. A really faulty target might
645 // both throw from OfferMessage and call ConsumeMessage,
646 // in which case it's possible we might not be able to propagate the exception
647 // out to the caller through the task if ConsumeMessage wins the race,
648 // which is likely if the exception doesn't occur until after ConsumeMessage is
649 // called. If that happens, we just eat the exception.
650 catch (Exception exc)
652 Common.StoreDataflowMessageValueIntoExceptionData(exc, _messageValue);
653 CompleteAsFaulted(exc, runAsync: false);
657 /// <summary>Called by the target to consume the buffered message.</summary>
658 TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
660 // Validate arguments
661 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
662 if (target == null) throw new ArgumentNullException("target");
663 Contract.EndContractBlock();
665 // If the task has already completed, there's nothing to consume. This could happen if
666 // cancellation was already requested and completed the task as a result.
667 if (Task.IsCompleted)
669 messageConsumed = false;
670 return default(TOutput);
673 // If the message being asked for is not the same as the one that's buffered,
674 // something is wrong. Complete as having failed to transfer the message.
675 bool validMessage = (messageHeader.Id == Common.SINGLE_MESSAGE_ID);
679 int curState = _cancellationState;
681 curState == CANCELLATION_STATE_NONE || curState == CANCELLATION_STATE_REGISTERED ||
682 curState == CANCELLATION_STATE_RESERVED || curState == CANCELLATION_STATE_COMPLETING,
683 "The current cancellation state is not valid.");
685 // If we're not dealing with cancellation, then if we're currently registered or reserved, try to transition
686 // to completing. If we're able to, allow the message to be consumed, and we're done. At this point, we
687 // support transitioning out of REGISTERED or RESERVED.
688 if (curState == CANCELLATION_STATE_NONE || // no synchronization necessary if there's no cancellation
689 (curState != CANCELLATION_STATE_COMPLETING && // fast check to avoid unnecessary synchronization
690 Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_COMPLETING, curState) == curState))
692 CompleteAsAccepted(runAsync: true);
693 messageConsumed = true;
694 return _messageValue;
698 // Consumption failed
699 messageConsumed = false;
700 return default(TOutput);
703 /// <summary>Called by the target to reserve the buffered message.</summary>
704 bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
706 // Validate arguments
707 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
708 if (target == null) throw new ArgumentNullException("target");
709 Contract.EndContractBlock();
711 // If the task has already completed, such as due to cancellation, there's nothing to reserve.
712 if (Task.IsCompleted) return false;
714 // As long as the message is the one being requested and cancellation hasn't been requested, allow it to be reserved.
715 bool reservable = (messageHeader.Id == Common.SINGLE_MESSAGE_ID);
717 (_cancellationState == CANCELLATION_STATE_NONE || // avoid synchronization when cancellation is not in play
718 Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_RESERVED, CANCELLATION_STATE_REGISTERED) == CANCELLATION_STATE_REGISTERED);
721 /// <summary>Called by the target to release a reservation on the buffered message.</summary>
722 void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
724 // Validate arguments
725 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
726 if (target == null) throw new ArgumentNullException("target");
727 Contract.EndContractBlock();
729 // If this is not the message we posted, bail
730 if (messageHeader.Id != Common.SINGLE_MESSAGE_ID)
731 throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
733 // If the task has already completed, there's nothing to release.
734 if (Task.IsCompleted) return;
736 // If a cancellation token is being used, revert our state back to registered. In the meantime
737 // cancellation could have been requested, so check to see now if cancellation was requested
738 // and process it if it was.
739 if (_cancellationState != CANCELLATION_STATE_NONE)
741 if (Interlocked.CompareExchange(ref _cancellationState, CANCELLATION_STATE_REGISTERED, CANCELLATION_STATE_RESERVED) != CANCELLATION_STATE_RESERVED)
742 throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
743 if (_cancellationToken.IsCancellationRequested)
744 CancellationHandler(new WeakReference<SendAsyncSource<TOutput>>(this)); // same code as registered with the CancellationToken
747 // Start the process over by reoffering the message asynchronously.
748 OfferToTargetAsync();
751 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
752 Task IDataflowBlock.Completion { get { return Task; } }
754 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
755 IDisposable ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
756 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
757 void IDataflowBlock.Complete() { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
758 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
759 void IDataflowBlock.Fault(Exception exception) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
761 /// <summary>The data to display in the debugger display attribute.</summary>
762 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
763 private object DebuggerDisplayContent
767 var displayTarget = _target as IDebuggerDisplay;
768 return string.Format("{0} Message={1}, Target=\"{2}\"",
769 Common.GetNameForDebugger(this),
771 displayTarget != null ? displayTarget.Content : _target);
774 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
775 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
777 /// <summary>Provides a debugger type proxy for the source.</summary>
778 private sealed class DebugView
780 /// <summary>The source.</summary>
781 private readonly SendAsyncSource<TOutput> _source;
783 /// <summary>Initializes the debug view.</summary>
784 /// <param name="source">The source to view.</param>
785 public DebugView(SendAsyncSource<TOutput> source)
787 Contract.Requires(source != null, "Need a source with which to construct the debug view.");
791 /// <summary>The target to which we're linked.</summary>
792 public ITargetBlock<TOutput> Target { get { return _source._target; } }
793 /// <summary>The message buffered by the source.</summary>
794 public TOutput Message { get { return _source._messageValue; } }
795 /// <summary>The Task represented the posting of the message.</summary>
796 public Task<bool> Completion { get { return _source.Task; } }
801 #region TryReceive, ReceiveAsync, and Receive
804 /// Attempts to synchronously receive an item from the <see cref="T:System.Threading.Tasks.Dataflow.ISourceBlock`1"/>.
806 /// <param name="source">The source from which to receive.</param>
807 /// <param name="item">The item received from the source.</param>
808 /// <returns>true if an item could be received; otherwise, false.</returns>
810 /// This method does not wait until the source has an item to provide.
811 /// It will return whether or not an element was available.
813 public static bool TryReceive<TOutput>(this IReceivableSourceBlock<TOutput> source, out TOutput item)
815 if (source == null) throw new ArgumentNullException("source");
816 Contract.EndContractBlock();
818 return source.TryReceive(null, out item);
823 /// <summary>Asynchronously receives a value from the specified source.</summary>
824 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
825 /// <param name="source">The source from which to asynchronously receive.</param>
827 /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation. When an item is successfully received from the source,
828 /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item. If an item cannot be retrieved,
829 /// because the source is empty and completed, the returned task will be canceled.
831 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
832 public static Task<TOutput> ReceiveAsync<TOutput>(
833 this ISourceBlock<TOutput> source)
835 // Argument validation handled by target method
836 return ReceiveAsync(source, Common.InfiniteTimeSpan, CancellationToken.None);
839 /// <summary>Asynchronously receives a value from the specified source.</summary>
840 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
841 /// <param name="source">The source from which to asynchronously receive.</param>
842 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
844 /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation. When an item is successfully received from the source,
845 /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item. If an item cannot be retrieved,
846 /// either because cancellation is requested or the source is empty and completed, the returned task will be canceled.
848 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
849 public static Task<TOutput> ReceiveAsync<TOutput>(
850 this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
852 // Argument validation handled by target method
853 return ReceiveAsync(source, Common.InfiniteTimeSpan, cancellationToken);
856 /// <summary>Asynchronously receives a value from the specified source.</summary>
857 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
858 /// <param name="source">The source from which to asynchronously receive.</param>
859 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
861 /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation. When an item is successfully received from the source,
862 /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item. If an item cannot be retrieved,
863 /// either because the timeout expires or the source is empty and completed, the returned task will be canceled.
865 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
866 /// <exception cref="System.ArgumentOutOfRangeException">
867 /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
869 public static Task<TOutput> ReceiveAsync<TOutput>(
870 this ISourceBlock<TOutput> source, TimeSpan timeout)
872 // Argument validation handled by target method
873 return ReceiveAsync(source, timeout, CancellationToken.None);
876 /// <summary>Asynchronously receives a value from the specified source.</summary>
877 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
878 /// <param name="source">The source from which to asynchronously receive.</param>
879 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
880 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
882 /// A <see cref="System.Threading.Tasks.Task{TOutput}"/> that represents the asynchronous receive operation. When an item is successfully received from the source,
883 /// the returned task will be completed and its <see cref="System.Threading.Tasks.Task{TOutput}.Result">Result</see> will return the received item. If an item cannot be retrieved,
884 /// either because the timeout expires, cancellation is requested, or the source is empty and completed, the returned task will be canceled.
886 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
887 /// <exception cref="System.ArgumentOutOfRangeException">
888 /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
890 public static Task<TOutput> ReceiveAsync<TOutput>(
891 this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
893 // Validate arguments
896 if (source == null) throw new ArgumentNullException("source");
897 if (!Common.IsValidTimeout(timeout)) throw new ArgumentOutOfRangeException("timeout", SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
899 // Return the task representing the core receive operation
900 return ReceiveCore(source, true, timeout, cancellationToken);
905 /// <summary>Synchronously receives an item from the source.</summary>
906 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
907 /// <param name="source">The source from which to receive.</param>
908 /// <returns>The received item.</returns>
909 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
910 /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
911 public static TOutput Receive<TOutput>(
912 this ISourceBlock<TOutput> source)
914 // Argument validation handled by target method
915 return Receive(source, Common.InfiniteTimeSpan, CancellationToken.None);
918 /// <summary>Synchronously receives an item from the source.</summary>
919 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
920 /// <param name="source">The source from which to receive.</param>
921 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
922 /// <returns>The received item.</returns>
923 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
924 /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
925 /// <exception cref="System.OperationCanceledException">The operation was canceled before an item was received from the source.</exception>
927 /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent cancellation request occurs.
929 public static TOutput Receive<TOutput>(
930 this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
932 // Argument validation handled by target method
933 return Receive(source, Common.InfiniteTimeSpan, cancellationToken);
936 /// <summary>Synchronously receives an item from the source.</summary>
937 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
938 /// <param name="source">The source from which to receive.</param>
939 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
940 /// <returns>The received item.</returns>
941 /// <exception cref="System.ArgumentOutOfRangeException">
942 /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
944 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
945 /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
946 /// <exception cref="System.TimeoutException">The specified timeout expired before an item was received from the source.</exception>
948 /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent timeout occurs.
950 public static TOutput Receive<TOutput>(
951 this ISourceBlock<TOutput> source, TimeSpan timeout)
953 // Argument validation handled by target method
954 return Receive(source, timeout, CancellationToken.None);
957 /// <summary>Synchronously receives an item from the source.</summary>
958 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
959 /// <param name="source">The source from which to receive.</param>
960 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
961 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
962 /// <returns>The received item.</returns>
963 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
964 /// <exception cref="System.ArgumentOutOfRangeException">
965 /// timeout is a negative number other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than <see cref="System.Int32.MaxValue"/>.
967 /// <exception cref="System.InvalidOperationException">No item could be received from the source.</exception>
968 /// <exception cref="System.TimeoutException">The specified timeout expired before an item was received from the source.</exception>
969 /// <exception cref="System.OperationCanceledException">The operation was canceled before an item was received from the source.</exception>
971 /// If the source successfully offered an item that was received by this operation, it will be returned, even if a concurrent timeout or cancellation request occurs.
973 [SuppressMessage("Microsoft.Usage", "CA2200:RethrowToPreserveStackDetails")]
974 public static TOutput Receive<TOutput>(
975 this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
977 // Validate arguments
978 if (source == null) throw new ArgumentNullException("source");
979 if (!Common.IsValidTimeout(timeout)) throw new ArgumentOutOfRangeException("timeout", SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
981 // Do fast path checks for both cancellation and data already existing.
982 cancellationToken.ThrowIfCancellationRequested();
983 TOutput fastCheckedItem;
984 var receivableSource = source as IReceivableSourceBlock<TOutput>;
985 if (receivableSource != null && receivableSource.TryReceive(null, out fastCheckedItem))
987 return fastCheckedItem;
990 // Get a TCS to represent the receive operation and wait for it to complete.
991 // If it completes successfully, return the result. Otherwise, throw the
992 // original inner exception representing the cause. This could be an OCE.
993 Task<TOutput> task = ReceiveCore(source, false, timeout, cancellationToken);
996 return task.GetAwaiter().GetResult(); // block until the result is available
1000 // Special case cancellation in order to ensure the exception contains the token.
1001 // The public TrySetCanceled, used by ReceiveCore, is parameterless and doesn't
1002 // accept the token to use. Thus the exception that we're catching here
1003 // won't contain the cancellation token we want propagated.
1004 if (task.IsCanceled) cancellationToken.ThrowIfCancellationRequested();
1006 // If we get here, propagate the original exception.
1012 #region Shared by Receive and ReceiveAsync
1013 /// <summary>Receives an item from the source.</summary>
1014 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1015 /// <param name="source">The source from which to receive.</param>
1016 /// <param name="attemptTryReceive">Whether to first attempt using TryReceive to get a value from the source.</param>
1017 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.</param>
1018 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
1019 /// <returns>A Task for the receive operation.</returns>
1020 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1021 private static Task<TOutput> ReceiveCore<TOutput>(
1022 this ISourceBlock<TOutput> source, bool attemptTryReceive, TimeSpan timeout, CancellationToken cancellationToken)
1024 Contract.Requires(source != null, "Need a source from which to receive.");
1026 // If cancellation has been requested, we're done before we've even started, cancel this receive.
1027 if (cancellationToken.IsCancellationRequested)
1029 return Common.CreateTaskFromCancellation<TOutput>(cancellationToken);
1032 if (attemptTryReceive)
1034 // If we're able to directly and immediately receive an item, use that item to complete the receive.
1035 var receivableSource = source as IReceivableSourceBlock<TOutput>;
1036 if (receivableSource != null)
1040 TOutput fastCheckedItem;
1041 if (receivableSource.TryReceive(null, out fastCheckedItem))
1043 return Task.FromResult<TOutput>(fastCheckedItem);
1046 catch (Exception exc)
1048 return Common.CreateTaskFromException<TOutput>(exc);
1053 int millisecondsTimeout = (int)timeout.TotalMilliseconds;
1054 if (millisecondsTimeout == 0)
1056 return Common.CreateTaskFromException<TOutput>(ReceiveTarget<TOutput>.CreateExceptionForTimeout());
1059 return ReceiveCoreByLinking<TOutput>(source, millisecondsTimeout, cancellationToken);
1062 /// <summary>The reason for a ReceiveCoreByLinking call failing.</summary>
1063 private enum ReceiveCoreByLinkingCleanupReason
1065 /// <summary>The Receive operation completed successfully, obtaining a value from the source.</summary>
1067 /// <summary>The timer expired before a value could be received.</summary>
1069 /// <summary>The cancellation token had cancellation requested before a value could be received.</summary>
1071 /// <summary>The source completed before a value could be received.</summary>
1072 SourceCompletion = 3,
1073 /// <summary>An error occurred while linking up the target.</summary>
1074 SourceProtocolError = 4,
1075 /// <summary>An error during cleanup after completion for another reason.</summary>
1076 ErrorDuringCleanup = 5
1079 /// <summary>Cancels a CancellationTokenSource passed as the object state argument.</summary>
1080 private static readonly Action<object> _cancelCts = state => ((CancellationTokenSource)state).Cancel();
1082 /// <summary>Receives an item from the source by linking a temporary target from it.</summary>
1083 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1084 /// <param name="source">The source from which to receive.</param>
1085 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or -1 to wait indefinitely.</param>
1086 /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the receive operation.</param>
1087 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1088 private static Task<TOutput> ReceiveCoreByLinking<TOutput>(ISourceBlock<TOutput> source, int millisecondsTimeout, CancellationToken cancellationToken)
1090 // Create a target to link from the source
1091 var target = new ReceiveTarget<TOutput>();
1093 // Keep cancellation registrations inside the try/catch in case the underlying CTS is disposed in which case an exception is thrown
1096 // Create a cancellation token that will be canceled when either the provided token
1097 // is canceled or the source block completes.
1098 if (cancellationToken.CanBeCanceled)
1100 target._externalCancellationToken = cancellationToken;
1101 target._regFromExternalCancellationToken = cancellationToken.Register(_cancelCts, target._cts);
1104 // We need to cleanup if one of a few things happens:
1105 // - The target completes successfully due to receiving data.
1106 // - The user-specified timeout occurs, such that we should bail on the receive.
1107 // - The cancellation token has cancellation requested, such that we should bail on the receive.
1108 // - The source completes, since it won't send any more data.
1109 // Note that there's a potential race here, in that the cleanup delegate could be executed
1110 // from the timer before the timer variable is set, but that's ok, because then timer variable
1111 // will just show up as null in the cleanup and there will be nothing to dispose (nor will anything
1112 // need to be disposed, since it's the timer that fired. Timer.Dispose is also thread-safe to be
1113 // called multiple times concurrently.)
1114 if (millisecondsTimeout > 0)
1116 target._timer = new Timer(
1117 ReceiveTarget<TOutput>.CachedLinkingTimerCallback, target,
1118 millisecondsTimeout, Timeout.Infinite);
1121 if (target._cts.Token.CanBeCanceled)
1123 target._cts.Token.Register(
1124 ReceiveTarget<TOutput>.CachedLinkingCancellationCallback, target); // we don't have to cleanup this registration, as this cts is short-lived
1127 // Link the target to the source
1128 IDisposable unlink = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
1129 target._unlink = unlink;
1131 // If completion has started, there is a chance it started after we linked.
1132 // In that case, we must dispose of the unlinker.
1133 // If completion started before we linked, the cleanup code will try to unlink.
1134 // So we are racing to dispose of the unlinker.
1135 if (Volatile.Read(ref target._cleanupReserved))
1137 IDisposable disposableUnlink = Interlocked.CompareExchange(ref target._unlink, null, unlink);
1138 if (disposableUnlink != null) disposableUnlink.Dispose();
1141 catch (Exception exception)
1143 target._receivedException = exception;
1144 target.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceProtocolError);
1145 // If we lose the race here, we may end up eating this exception.
1151 /// <summary>Provides a TaskCompletionSource that is also a dataflow target for use in ReceiveCore.</summary>
1152 /// <typeparam name="T">Specifies the type of data offered to the target.</typeparam>
1153 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
1154 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1155 private sealed class ReceiveTarget<T> : TaskCompletionSource<T>, ITargetBlock<T>, IDebuggerDisplay
1157 /// <summary>Cached delegate used in ReceiveCoreByLinking on the created timer. Passed the ReceiveTarget as the argument.</summary>
1158 /// <remarks>The C# compiler will not cache this delegate by default due to it being a generic method on a non-generic class.</remarks>
1159 internal readonly static TimerCallback CachedLinkingTimerCallback = state =>
1161 var receiveTarget = (ReceiveTarget<T>)state;
1162 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Timer);
1165 /// <summary>Cached delegate used in ReceiveCoreByLinking on the cancellation token. Passed the ReceiveTarget as the state argument.</summary>
1166 /// <remarks>The C# compiler will not cache this delegate by default due to it being a generic method on a non-generic class.</remarks>
1167 internal readonly static Action<object> CachedLinkingCancellationCallback = state =>
1169 var receiveTarget = (ReceiveTarget<T>)state;
1170 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Cancellation);
1173 /// <summary>The received value if we accepted a value from the source.</summary>
1174 private T _receivedValue;
1176 /// <summary>The cancellation token source representing both external and internal cancellation.</summary>
1177 internal readonly CancellationTokenSource _cts = new CancellationTokenSource();
1178 /// <summary>Indicates a code path is already on route to complete the target. 0 is false, 1 is true.</summary>
1179 internal bool _cleanupReserved; // must only be accessed under IncomingLock
1180 /// <summary>The external token that cancels the internal token.</summary>
1181 internal CancellationToken _externalCancellationToken;
1182 /// <summary>The registration on the external token that cancels the internal token.</summary>
1183 internal CancellationTokenRegistration _regFromExternalCancellationToken;
1184 /// <summary>The timer that fires when the timeout has been exceeded.</summary>
1185 internal Timer _timer;
1186 /// <summary>The unlinker from removing this target from the source from which we're receiving.</summary>
1187 internal IDisposable _unlink;
1188 /// <summary>The received exception if an error occurred.</summary>
1189 internal Exception _receivedException;
1191 /// <summary>Gets the sync obj used to synchronize all activity on this target.</summary>
1192 internal object IncomingLock { get { return _cts; } }
1194 /// <summary>Initializes the target.</summary>
1195 internal ReceiveTarget() { }
1197 /// <summary>Offers a message to be used to complete the TaskCompletionSource.</summary>
1198 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1199 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
1201 // Validate arguments
1202 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1203 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
1204 Contract.EndContractBlock();
1206 DataflowMessageStatus status = DataflowMessageStatus.NotAvailable;
1208 // If we're already one our way to being done, don't accept anything.
1209 // This is a fast-path check prior to taking the incoming lock;
1210 // _cleanupReserved only ever goes from false to true.
1211 if (Volatile.Read(ref _cleanupReserved)) return DataflowMessageStatus.DecliningPermanently;
1215 // Check again now that we've taken the lock
1216 if (_cleanupReserved) return DataflowMessageStatus.DecliningPermanently;
1220 // Accept the message if possible and complete this task with the message's value.
1221 bool consumed = true;
1222 T acceptedValue = consumeToAccept ? source.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
1225 status = DataflowMessageStatus.Accepted;
1226 _receivedValue = acceptedValue;
1227 _cleanupReserved = true;
1230 catch (Exception exc)
1232 // An error occurred. Take ourselves out of the game.
1233 status = DataflowMessageStatus.DecliningPermanently;
1234 Common.StoreDataflowMessageValueIntoExceptionData(exc, messageValue);
1235 _receivedException = exc;
1236 _cleanupReserved = true;
1240 // Do any cleanup outside of the lock. The right to cleanup was reserved above for these cases.
1241 if (status == DataflowMessageStatus.Accepted)
1243 CleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Success);
1245 else if (status == DataflowMessageStatus.DecliningPermanently) // should only be the case if an error occurred
1247 CleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceProtocolError);
1254 /// Attempts to reserve the right to cleanup and complete, and if successfully,
1255 /// continues to cleanup and complete.
1257 /// <param name="reason">The reason we're completing and cleaning up.</param>
1258 /// <returns>true if successful in completing; otherwise, false.</returns>
1259 internal bool TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
1261 // If cleanup was already reserved, bail.
1262 if (Volatile.Read(ref _cleanupReserved)) return false;
1264 // Atomically using IncomingLock try to reserve the completion routine.
1267 if (_cleanupReserved) return false;
1268 _cleanupReserved = true;
1271 // We've reserved cleanup and completion, so do it.
1272 CleanupAndComplete(reason);
1276 /// <summary>Cleans up the target for completion.</summary>
1277 /// <param name="reason">The reason we're completing and cleaning up.</param>
1278 /// <remarks>This method must only be called once on this instance.</remarks>
1279 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1280 [SuppressMessage("Microsoft.Usage", "CA2201:DoNotRaiseReservedExceptionTypes")]
1281 private void CleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
1283 Common.ContractAssertMonitorStatus(IncomingLock, held: false);
1284 Debug.Assert(Volatile.Read(ref _cleanupReserved), "Should only be called once by whomever reserved the right.");
1286 // Unlink from the source. If we're cleaning up because the source
1287 // completed, this is unnecessary, as the source should have already
1288 // emptied out its target registry, or at least be in the process of doing so.
1289 // We are racing with the linking code - only one can dispose of the unlinker.
1290 IDisposable unlink = _unlink;
1291 if (reason != ReceiveCoreByLinkingCleanupReason.SourceCompletion && unlink != null)
1293 IDisposable disposableUnlink = Interlocked.CompareExchange(ref _unlink, null, unlink);
1294 if (disposableUnlink != null)
1296 // If an error occurs, fault the target and override the reason to
1297 // continue executing, i.e. do the remaining cleanup without completing
1298 // the target the way we originally intended to.
1301 disposableUnlink.Dispose(); // must not be holding IncomingLock, or could deadlock
1303 catch (Exception exc)
1305 _receivedException = exc;
1306 reason = ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1311 // Cleanup the timer. (Even if we're here because of the timer firing, we still
1312 // want to aggressively dispose of the timer.)
1313 if (_timer != null) _timer.Dispose();
1315 // Cancel the token everyone is listening to. We also want to unlink
1316 // from the user-provided cancellation token to prevent a leak.
1317 // We do *not* dispose of the cts itself here, as there could be a race
1318 // with the code registering this cleanup delegate with cts; not disposing
1319 // is ok, though, because there's no resources created by the CTS
1320 // that needs to be cleaned up since we're not using the wait handle.
1321 // This is also why we don't use CreateLinkedTokenSource, as that combines
1322 // both disposing of the token source and disposal of the connection link
1323 // into a single dispose operation.
1324 // if we're here because of cancellation, no need to cancel again
1325 if (reason != ReceiveCoreByLinkingCleanupReason.Cancellation)
1327 // if the source complete without receiving a value, we check the cancellation one more time
1328 if (reason == ReceiveCoreByLinkingCleanupReason.SourceCompletion &&
1329 (_externalCancellationToken.IsCancellationRequested || _cts.IsCancellationRequested))
1331 reason = ReceiveCoreByLinkingCleanupReason.Cancellation;
1335 _regFromExternalCancellationToken.Dispose();
1337 // No need to dispose of the cts, either, as we're not accessing its WaitHandle
1338 // nor was it created as a linked token source. Disposing it could also be dangerous
1339 // if other code tries to access it after we dispose of it... best to leave it available.
1341 // Complete the task based on the reason
1344 // Task final state: RanToCompletion
1345 case ReceiveCoreByLinkingCleanupReason.Success:
1346 System.Threading.Tasks.Task.Factory.StartNew(state =>
1348 // Complete with the received value
1349 var target = (ReceiveTarget<T>)state;
1350 try { target.TrySetResult(target._receivedValue); }
1351 catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1352 }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1355 // Task final state: Canceled
1356 case ReceiveCoreByLinkingCleanupReason.Cancellation:
1357 System.Threading.Tasks.Task.Factory.StartNew(state =>
1359 // Complete as canceled
1360 var target = (ReceiveTarget<T>)state;
1361 try { target.TrySetCanceled(); }
1362 catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1363 }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1366 Debug.Assert(false, "Invalid linking cleanup reason specified.");
1367 goto case ReceiveCoreByLinkingCleanupReason.Cancellation;
1369 // Task final state: Faulted
1370 case ReceiveCoreByLinkingCleanupReason.SourceCompletion:
1371 if (_receivedException == null) _receivedException = CreateExceptionForSourceCompletion();
1372 goto case ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1373 case ReceiveCoreByLinkingCleanupReason.Timer:
1374 if (_receivedException == null) _receivedException = CreateExceptionForTimeout();
1375 goto case ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
1376 case ReceiveCoreByLinkingCleanupReason.SourceProtocolError:
1377 case ReceiveCoreByLinkingCleanupReason.ErrorDuringCleanup:
1378 Debug.Assert(_receivedException != null, "We need an exception with which to fault the task.");
1379 System.Threading.Tasks.Task.Factory.StartNew(state =>
1381 // Complete with the received exception
1382 var target = (ReceiveTarget<T>)state;
1383 try { target.TrySetException(target._receivedException ?? new Exception()); }
1384 catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
1385 }, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
1390 /// <summary>Creates an exception to use when a source completed before receiving a value.</summary>
1391 /// <returns>The initialized exception.</returns>
1392 internal static Exception CreateExceptionForSourceCompletion()
1394 return Common.InitializeStackTrace(new InvalidOperationException(SR.InvalidOperation_DataNotAvailableForReceive));
1397 /// <summary>Creates an exception to use when a timeout occurs before receiving a value.</summary>
1398 /// <returns>The initialized exception.</returns>
1399 internal static Exception CreateExceptionForTimeout()
1401 return Common.InitializeStackTrace(new TimeoutException());
1404 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1405 void IDataflowBlock.Complete()
1407 TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceCompletion);
1410 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1411 void IDataflowBlock.Fault(Exception exception) { ((IDataflowBlock)this).Complete(); }
1413 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1414 Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
1416 /// <summary>The data to display in the debugger display attribute.</summary>
1417 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1418 private object DebuggerDisplayContent
1422 return string.Format("{0} IsCompleted={1}",
1423 Common.GetNameForDebugger(this), base.Task.IsCompleted);
1426 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1427 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1432 #region OutputAvailableAsync
1434 /// Provides a <see cref="System.Threading.Tasks.Task{TResult}"/>
1435 /// that asynchronously monitors the source for available output.
1437 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1438 /// <param name="source">The source to monitor.</param>
1440 /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that informs of whether and when
1441 /// more output is available. When the task completes, if its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> is true, more output
1442 /// is available in the source (though another consumer of the source may retrieve the data).
1443 /// If it returns false, more output is not and will never be available, due to the source
1444 /// completing prior to output being available.
1446 public static Task<bool> OutputAvailableAsync<TOutput>(this ISourceBlock<TOutput> source)
1448 return OutputAvailableAsync<TOutput>(source, CancellationToken.None);
1452 /// Provides a <see cref="System.Threading.Tasks.Task{TResult}"/>
1453 /// that asynchronously monitors the source for available output.
1455 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
1456 /// <param name="source">The source to monitor.</param>
1457 /// <param name="cancellationToken">The cancellation token with which to cancel the asynchronous operation.</param>
1459 /// A <see cref="System.Threading.Tasks.Task{Boolean}"/> that informs of whether and when
1460 /// more output is available. When the task completes, if its <see cref="System.Threading.Tasks.Task{Boolean}.Result"/> is true, more output
1461 /// is available in the source (though another consumer of the source may retrieve the data).
1462 /// If it returns false, more output is not and will never be available, due to the source
1463 /// completing prior to output being available.
1465 [SuppressMessage("Microsoft.Design", "CA1011:ConsiderPassingBaseTypesAsParameters")]
1466 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1467 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1468 public static Task<bool> OutputAvailableAsync<TOutput>(
1469 this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
1471 // Validate arguments
1472 if (source == null) throw new ArgumentNullException("source");
1473 Contract.EndContractBlock();
1475 // Fast path for cancellation
1476 if (cancellationToken.IsCancellationRequested)
1477 return Common.CreateTaskFromCancellation<bool>(cancellationToken);
1479 // In a method like this, normally we would want to check source.Completion.IsCompleted
1480 // and avoid linking completely by simply returning a completed task. However,
1481 // some blocks that are completed still have data available, like WriteOnceBlock,
1482 // which completes as soon as it gets a value and stores that value forever.
1483 // As such, OutputAvailableAsync must link from the source so that the source
1484 // can push data to us if it has it, at which point we can immediately unlink.
1486 // Create a target task that will complete when it's offered a message (but it won't accept the message)
1487 var target = new OutputAvailableAsyncTarget<TOutput>();
1490 // Link from the source. If the source propagates a message during or immediately after linking
1491 // such that our target is already completed, just return its task.
1492 target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
1494 // If the task is already completed (an exception may have occurred, or the source may have propagated
1495 // a message to the target during LinkTo or soon thereafter), just return the task directly.
1496 if (target.Task.IsCompleted)
1501 // If cancellation could be requested, hook everything up to be notified of cancellation requests.
1502 if (cancellationToken.CanBeCanceled)
1504 // When cancellation is requested, unlink the target from the source and cancel the target.
1505 target._ctr = cancellationToken.Register(OutputAvailableAsyncTarget<TOutput>.s_cancelAndUnlink, target);
1508 // We can't return the task directly, as the source block will be completing the task synchronously,
1509 // and thus any synchronous continuations would run as part of the source block's call. We don't have to worry
1510 // about cancellation, as we've coded cancellation to complete the task asynchronously, and with the continuation
1511 // set as NotOnCanceled, so the continuation will be canceled immediately when the antecedent is canceled, which
1512 // will thus be asynchronously from the cancellation token source's cancellation call.
1513 return target.Task.ContinueWith(
1514 OutputAvailableAsyncTarget<TOutput>.s_handleCompletion, target,
1515 CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
1517 catch (Exception exc)
1519 // Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested
1520 // such that it synchronously invokes the source's unlinker IDisposable, which could throw.
1521 target.TrySetException(exc);
1523 // Undo the link from the source to the target
1524 target.AttemptThreadSafeUnlink();
1526 // Return the now faulted task
1531 /// <summary>Provides a target used in OutputAvailableAsync operations.</summary>
1532 /// <typeparam name="T">Specifies the type of data in the data source being checked.</typeparam>
1533 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1534 private sealed class OutputAvailableAsyncTarget<T> : TaskCompletionSource<bool>, ITargetBlock<T>, IDebuggerDisplay
1537 /// Cached continuation delegate that unregisters from cancellation and
1538 /// marshals the antecedent's result to the return value.
1540 internal readonly static Func<Task<bool>, object, bool> s_handleCompletion = (antecedent, state) =>
1542 var target = state as OutputAvailableAsyncTarget<T>;
1543 Debug.Assert(target != null, "Expected non-null target");
1544 target._ctr.Dispose();
1545 return antecedent.GetAwaiter().GetResult();
1549 /// Cached delegate that cancels the target and unlinks the target from the source.
1550 /// Expects an OutputAvailableAsyncTarget as the state argument.
1552 internal readonly static Action<object> s_cancelAndUnlink = CancelAndUnlink;
1554 /// <summary>Cancels the target and unlinks the target from the source.</summary>
1555 /// <param name="state">An OutputAvailableAsyncTarget.</param>
1556 private static void CancelAndUnlink(object state)
1558 var target = state as OutputAvailableAsyncTarget<T>;
1559 Debug.Assert(target != null, "Expected a non-null target");
1561 // Cancel asynchronously so that we're not completing the task as part of the cts.Cancel() call,
1562 // since synchronous continuations off that task would then run as part of Cancel.
1563 // Take advantage of this task and unlink from there to avoid doing the interlocked operation synchronously.
1564 System.Threading.Tasks.Task.Factory.StartNew(tgt =>
1566 var thisTarget = (OutputAvailableAsyncTarget<T>)tgt;
1567 thisTarget.TrySetCanceled();
1568 thisTarget.AttemptThreadSafeUnlink();
1570 target, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
1573 /// <summary>Disposes of _unlinker if the target has been linked.</summary>
1574 internal void AttemptThreadSafeUnlink()
1576 // A race is possible. Therefore use an interlocked operation.
1577 IDisposable cachedUnlinker = _unlinker;
1578 if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker)
1580 cachedUnlinker.Dispose();
1584 /// <summary>The IDisposable used to unlink this target from its source.</summary>
1585 internal IDisposable _unlinker;
1586 /// <summary>The registration used to unregister this target from the cancellation token.</summary>
1587 internal CancellationTokenRegistration _ctr;
1589 /// <summary>Completes the task when offered a message (but doesn't consume the message).</summary>
1590 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
1592 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
1593 if (source == null) throw new ArgumentNullException("source");
1594 Contract.EndContractBlock();
1597 return DataflowMessageStatus.DecliningPermanently;
1600 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1601 void IDataflowBlock.Complete()
1603 TrySetResult(false);
1606 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1607 void IDataflowBlock.Fault(Exception exception)
1609 if (exception == null) throw new ArgumentNullException("exception");
1610 Contract.EndContractBlock();
1611 TrySetResult(false);
1614 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1615 Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
1617 /// <summary>The data to display in the debugger display attribute.</summary>
1618 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1619 private object DebuggerDisplayContent
1623 return string.Format("{0} IsCompleted={1}",
1624 Common.GetNameForDebugger(this), base.Task.IsCompleted);
1627 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1628 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1633 /// <summary>Encapsulates a target and a source into a single propagator.</summary>
1634 /// <typeparam name="TInput">Specifies the type of input expected by the target.</typeparam>
1635 /// <typeparam name="TOutput">Specifies the type of output produced by the source.</typeparam>
1636 /// <param name="target">The target to encapsulate.</param>
1637 /// <param name="source">The source to encapsulate.</param>
1638 /// <returns>The encapsulated target and source.</returns>
1640 /// This method does not in any way connect the target to the source. It creates a
1641 /// propagator block whose target methods delegate to the specified target and whose
1642 /// source methods delegate to the specified source. Any connection between the target
1643 /// and the source is left for the developer to explicitly provide. The propagator's
1644 /// <see cref="IDataflowBlock"/> implementation delegates to the specified source.
1646 public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
1647 ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
1649 if (target == null) throw new ArgumentNullException("target");
1650 if (source == null) throw new ArgumentNullException("source");
1651 Contract.EndContractBlock();
1652 return new EncapsulatingPropagator<TInput, TOutput>(target, source);
1655 /// <summary>Provides a dataflow block that encapsulates a target and a source to form a single propagator.</summary>
1656 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1657 [DebuggerTypeProxy(typeof(EncapsulatingPropagator<,>.DebugView))]
1658 private sealed class EncapsulatingPropagator<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
1660 /// <summary>The target half.</summary>
1661 private ITargetBlock<TInput> _target;
1662 /// <summary>The source half.</summary>
1663 private ISourceBlock<TOutput> _source;
1665 public EncapsulatingPropagator(ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
1667 Contract.Requires(target != null, "The target should never be null; this should be checked by all internal usage.");
1668 Contract.Requires(source != null, "The source should never be null; this should be checked by all internal usage.");
1673 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
1674 public void Complete()
1679 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
1680 void IDataflowBlock.Fault(Exception exception)
1682 if (exception == null) throw new ArgumentNullException("exception");
1683 Contract.EndContractBlock();
1685 _target.Fault(exception);
1687 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
1688 public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
1690 return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
1693 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
1694 public Task Completion { get { return _source.Completion; } }
1696 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
1697 public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
1699 return _source.LinkTo(target, linkOptions);
1702 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
1703 public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
1705 var receivableSource = _source as IReceivableSourceBlock<TOutput>;
1706 if (receivableSource != null) return receivableSource.TryReceive(filter, out item);
1708 item = default(TOutput);
1712 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
1713 public bool TryReceiveAll(out IList<TOutput> items)
1715 var receivableSource = _source as IReceivableSourceBlock<TOutput>;
1716 if (receivableSource != null) return receivableSource.TryReceiveAll(out items);
1718 items = default(IList<TOutput>);
1722 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
1723 public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
1725 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
1728 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
1729 public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1731 return _source.ReserveMessage(messageHeader, target);
1734 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
1735 public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
1737 _source.ReleaseReservation(messageHeader, target);
1740 /// <summary>The data to display in the debugger display attribute.</summary>
1741 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
1742 private object DebuggerDisplayContent
1746 var displayTarget = _target as IDebuggerDisplay;
1747 var displaySource = _source as IDebuggerDisplay;
1748 return string.Format("{0} Target=\"{1}\", Source=\"{2}\"",
1749 Common.GetNameForDebugger(this),
1750 displayTarget != null ? displayTarget.Content : _target,
1751 displaySource != null ? displaySource.Content : _source);
1754 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
1755 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
1757 /// <summary>A debug view for the propagator.</summary>
1758 private sealed class DebugView
1760 /// <summary>The propagator being debugged.</summary>
1761 private readonly EncapsulatingPropagator<TInput, TOutput> _propagator;
1763 /// <summary>Initializes the debug view.</summary>
1764 /// <param name="propagator">The propagator being debugged.</param>
1765 public DebugView(EncapsulatingPropagator<TInput, TOutput> propagator)
1767 Contract.Requires(propagator != null, "Need a block with which to construct the debug view.");
1768 _propagator = propagator;
1771 /// <summary>The target.</summary>
1772 public ITargetBlock<TInput> Target { get { return _propagator._target; } }
1773 /// <summary>The source.</summary>
1774 public ISourceBlock<TOutput> Source { get { return _propagator._source; } }
1780 #region Choose<T1,T2>
1781 /// <summary>Monitors two dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1782 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1783 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1784 /// <param name="source1">The first source.</param>
1785 /// <param name="action1">The handler to execute on data from the first source.</param>
1786 /// <param name="source2">The second source.</param>
1787 /// <param name="action2">The handler to execute on data from the second source.</param>
1790 /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1791 /// If both sources are completed prior to the choice completing,
1792 /// the resulting task will be canceled. When one of the sources has data available and successfully propagates
1793 /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1794 /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1795 /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to either 0 or 1 to
1796 /// represent the first or second source, respectively.
1799 /// This method will only consume an element from one of the two data sources, never both.
1802 /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1803 /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1804 /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1805 /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1806 public static Task<Int32> Choose<T1, T2>(
1807 ISourceBlock<T1> source1, Action<T1> action1,
1808 ISourceBlock<T2> source2, Action<T2> action2)
1810 // All argument validation is handled by the delegated method
1811 return Choose(source1, action1, source2, action2, DataflowBlockOptions.Default);
1814 /// <summary>Monitors two dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1815 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1816 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1817 /// <param name="source1">The first source.</param>
1818 /// <param name="action1">The handler to execute on data from the first source.</param>
1819 /// <param name="source2">The second source.</param>
1820 /// <param name="action2">The handler to execute on data from the second source.</param>
1821 /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1824 /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1825 /// If both sources are completed prior to the choice completing, or if the CancellationToken
1826 /// provided as part of <paramref name="dataflowBlockOptions"/> is canceled prior to the choice completing,
1827 /// the resulting task will be canceled. When one of the sources has data available and successfully propagates
1828 /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1829 /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1830 /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to either 0 or 1 to
1831 /// represent the first or second source, respectively.
1834 /// This method will only consume an element from one of the two data sources, never both.
1835 /// If cancellation is requested after an element has been received, the cancellation request will be ignored,
1836 /// and the relevant handler will be allowed to execute.
1839 /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1840 /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1841 /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1842 /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1843 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
1844 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1845 public static Task<Int32> Choose<T1, T2>(
1846 ISourceBlock<T1> source1, Action<T1> action1,
1847 ISourceBlock<T2> source2, Action<T2> action2,
1848 DataflowBlockOptions dataflowBlockOptions)
1850 // Validate arguments
1851 if (source1 == null) throw new ArgumentNullException("source1");
1852 if (action1 == null) throw new ArgumentNullException("action1");
1853 if (source2 == null) throw new ArgumentNullException("source2");
1854 if (action2 == null) throw new ArgumentNullException("action2");
1855 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
1857 // Delegate to the shared implementation
1858 return ChooseCore<T1, T2, VoidResult>(source1, action1, source2, action2, null, null, dataflowBlockOptions);
1862 #region Choose<T1,T2,T3>
1863 /// <summary>Monitors three dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1864 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1865 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1866 /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1867 /// <param name="source1">The first source.</param>
1868 /// <param name="action1">The handler to execute on data from the first source.</param>
1869 /// <param name="source2">The second source.</param>
1870 /// <param name="action2">The handler to execute on data from the second source.</param>
1871 /// <param name="source3">The third source.</param>
1872 /// <param name="action3">The handler to execute on data from the third source.</param>
1875 /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1876 /// If all sources are completed prior to the choice completing,
1877 /// the resulting task will be canceled. When one of the sources has data available and successfully propagates
1878 /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1879 /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1880 /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to the 0-based index of the source.
1883 /// This method will only consume an element from one of the data sources, never more than one.
1886 /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1887 /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1888 /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1889 /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1890 /// <exception cref="System.ArgumentNullException">The <paramref name="source3"/> is null (Nothing in Visual Basic).</exception>
1891 /// <exception cref="System.ArgumentNullException">The <paramref name="action3"/> is null (Nothing in Visual Basic).</exception>
1892 public static Task<Int32> Choose<T1, T2, T3>(
1893 ISourceBlock<T1> source1, Action<T1> action1,
1894 ISourceBlock<T2> source2, Action<T2> action2,
1895 ISourceBlock<T3> source3, Action<T3> action3)
1897 // All argument validation is handled by the delegated method
1898 return Choose(source1, action1, source2, action2, source3, action3, DataflowBlockOptions.Default);
1901 /// <summary>Monitors three dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1902 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1903 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1904 /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1905 /// <param name="source1">The first source.</param>
1906 /// <param name="action1">The handler to execute on data from the first source.</param>
1907 /// <param name="source2">The second source.</param>
1908 /// <param name="action2">The handler to execute on data from the second source.</param>
1909 /// <param name="source3">The third source.</param>
1910 /// <param name="action3">The handler to execute on data from the third source.</param>
1911 /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1914 /// A <see cref="System.Threading.Tasks.Task{Int32}"/> that represents the asynchronous choice.
1915 /// If all sources are completed prior to the choice completing, or if the CancellationToken
1916 /// provided as part of <paramref name="dataflowBlockOptions"/> is canceled prior to the choice completing,
1917 /// the resulting task will be canceled. When one of the sources has data available and successfully propagates
1918 /// it to the choice, the resulting task will complete when the handler completes: if the handler throws an exception,
1919 /// the task will end in the <see cref="System.Threading.Tasks.TaskStatus.Faulted"/> state containing the unhandled exception, otherwise the task
1920 /// will end with its <see cref="System.Threading.Tasks.Task{Int32}.Result"/> set to the 0-based index of the source.
1923 /// This method will only consume an element from one of the data sources, never more than one.
1924 /// If cancellation is requested after an element has been received, the cancellation request will be ignored,
1925 /// and the relevant handler will be allowed to execute.
1928 /// <exception cref="System.ArgumentNullException">The <paramref name="source1"/> is null (Nothing in Visual Basic).</exception>
1929 /// <exception cref="System.ArgumentNullException">The <paramref name="action1"/> is null (Nothing in Visual Basic).</exception>
1930 /// <exception cref="System.ArgumentNullException">The <paramref name="source2"/> is null (Nothing in Visual Basic).</exception>
1931 /// <exception cref="System.ArgumentNullException">The <paramref name="action2"/> is null (Nothing in Visual Basic).</exception>
1932 /// <exception cref="System.ArgumentNullException">The <paramref name="source3"/> is null (Nothing in Visual Basic).</exception>
1933 /// <exception cref="System.ArgumentNullException">The <paramref name="action3"/> is null (Nothing in Visual Basic).</exception>
1934 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
1935 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1936 public static Task<Int32> Choose<T1, T2, T3>(
1937 ISourceBlock<T1> source1, Action<T1> action1,
1938 ISourceBlock<T2> source2, Action<T2> action2,
1939 ISourceBlock<T3> source3, Action<T3> action3,
1940 DataflowBlockOptions dataflowBlockOptions)
1942 // Validate arguments
1943 if (source1 == null) throw new ArgumentNullException("source1");
1944 if (action1 == null) throw new ArgumentNullException("action1");
1945 if (source2 == null) throw new ArgumentNullException("source2");
1946 if (action2 == null) throw new ArgumentNullException("action2");
1947 if (source3 == null) throw new ArgumentNullException("source3");
1948 if (action3 == null) throw new ArgumentNullException("action3");
1949 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
1951 // Delegate to the shared implementation
1952 return ChooseCore<T1, T2, T3>(source1, action1, source2, action2, source3, action3, dataflowBlockOptions);
1956 #region Choose Shared
1957 /// <summary>Monitors dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
1958 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
1959 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
1960 /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
1961 /// <param name="source1">The first source.</param>
1962 /// <param name="action1">The handler to execute on data from the first source.</param>
1963 /// <param name="source2">The second source.</param>
1964 /// <param name="action2">The handler to execute on data from the second source.</param>
1965 /// <param name="source3">The third source.</param>
1966 /// <param name="action3">The handler to execute on data from the third source.</param>
1967 /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
1968 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
1969 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
1970 private static Task<Int32> ChooseCore<T1, T2, T3>(
1971 ISourceBlock<T1> source1, Action<T1> action1,
1972 ISourceBlock<T2> source2, Action<T2> action2,
1973 ISourceBlock<T3> source3, Action<T3> action3,
1974 DataflowBlockOptions dataflowBlockOptions)
1976 Contract.Requires(source1 != null && action1 != null, "The first source and action should not be null.");
1977 Contract.Requires(source2 != null && action2 != null, "The second source and action should not be null.");
1978 Contract.Requires((source3 == null) == (action3 == null), "The third action should be null iff the third source is null.");
1979 Contract.Requires(dataflowBlockOptions != null, "Options are required.");
1980 bool hasThirdSource = source3 != null; // In the future, if we want higher arities on Choose, we can simply add more such checks on additional arguments
1982 // Early cancellation check and bail out
1983 if (dataflowBlockOptions.CancellationToken.IsCancellationRequested)
1984 return Common.CreateTaskFromCancellation<Int32>(dataflowBlockOptions.CancellationToken);
1986 // Fast path: if any of the sources already has data available that can be received immediately.
1987 Task<int> resultTask;
1990 TaskScheduler scheduler = dataflowBlockOptions.TaskScheduler;
1991 if (TryChooseFromSource(source1, action1, 0, scheduler, out resultTask) ||
1992 TryChooseFromSource(source2, action2, 1, scheduler, out resultTask) ||
1993 (hasThirdSource && TryChooseFromSource(source3, action3, 2, scheduler, out resultTask)))
1998 catch (Exception exc)
2000 // In case TryReceive in TryChooseFromSource erroneously throws
2001 return Common.CreateTaskFromException<int>(exc);
2004 // Slow path: link up to all of the sources. Separated out to avoid a closure on the fast path.
2005 return ChooseCoreByLinking(source1, action1, source2, action2, source3, action3, dataflowBlockOptions);
2009 /// Tries to remove data from a receivable source and schedule an action to process that received item.
2011 /// <typeparam name="T">Specifies the type of data to process.</typeparam>
2012 /// <param name="source">The source from which to receive the data.</param>
2013 /// <param name="action">The action to run for the received data.</param>
2014 /// <param name="branchId">The branch ID associated with this source/action pair.</param>
2015 /// <param name="scheduler">The scheduler to use to process the action.</param>
2016 /// <param name="task">The task created for processing the received item.</param>
2017 /// <returns>true if this try attempt satisfies the choose operation; otherwise, false.</returns>
2018 private static bool TryChooseFromSource<T>(
2019 ISourceBlock<T> source, Action<T> action, int branchId, TaskScheduler scheduler,
2022 // Validate arguments
2023 Contract.Requires(source != null, "Expected a non-null source");
2024 Contract.Requires(action != null, "Expected a non-null action");
2025 Contract.Requires(branchId >= 0, "Expected a valid branch ID (> 0)");
2026 Contract.Requires(scheduler != null, "Expected a non-null scheduler");
2028 // Try to receive from the source. If we can't, bail.
2030 var receivableSource = source as IReceivableSourceBlock<T>;
2031 if (receivableSource == null || !receivableSource.TryReceive(out result))
2037 // We successfully received an item. Launch a task to process it.
2038 task = Task.Factory.StartNew(ChooseTarget<T>.s_processBranchFunction,
2039 Tuple.Create<Action<T>, T, int>(action, result, branchId),
2040 CancellationToken.None, Common.GetCreationOptionsForTask(), scheduler);
2044 /// <summary>Monitors dataflow sources, invoking the provided handler for whichever source makes data available first.</summary>
2045 /// <typeparam name="T1">Specifies type of data contained in the first source.</typeparam>
2046 /// <typeparam name="T2">Specifies type of data contained in the second source.</typeparam>
2047 /// <typeparam name="T3">Specifies type of data contained in the third source.</typeparam>
2048 /// <param name="source1">The first source.</param>
2049 /// <param name="action1">The handler to execute on data from the first source.</param>
2050 /// <param name="source2">The second source.</param>
2051 /// <param name="action2">The handler to execute on data from the second source.</param>
2052 /// <param name="source3">The third source.</param>
2053 /// <param name="action3">The handler to execute on data from the third source.</param>
2054 /// <param name="dataflowBlockOptions">The options with which to configure this choice.</param>
2055 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2056 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
2057 private static Task<Int32> ChooseCoreByLinking<T1, T2, T3>(
2058 ISourceBlock<T1> source1, Action<T1> action1,
2059 ISourceBlock<T2> source2, Action<T2> action2,
2060 ISourceBlock<T3> source3, Action<T3> action3,
2061 DataflowBlockOptions dataflowBlockOptions)
2063 Contract.Requires(source1 != null && action1 != null, "The first source and action should not be null.");
2064 Contract.Requires(source2 != null && action2 != null, "The second source and action should not be null.");
2065 Contract.Requires((source3 == null) == (action3 == null), "The third action should be null iff the third source is null.");
2066 Contract.Requires(dataflowBlockOptions != null, "Options are required.");
2068 bool hasThirdSource = source3 != null; // In the future, if we want higher arities on Choose, we can simply add more such checks on additional arguments
2070 // Create object to act as both completion marker and sync obj for targets.
2071 var boxedCompleted = new StrongBox<Task>();
2073 // Set up teardown cancellation. We will request cancellation when a) the supplied options token
2074 // has cancellation requested or b) when we actually complete somewhere in order to tear down
2075 // the rest of our configured set up.
2076 CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken, CancellationToken.None);
2078 // Set up the branches.
2079 TaskScheduler scheduler = dataflowBlockOptions.TaskScheduler;
2080 var branchTasks = new Task<int>[hasThirdSource ? 3 : 2];
2081 branchTasks[0] = CreateChooseBranch(boxedCompleted, cts, scheduler, 0, source1, action1);
2082 branchTasks[1] = CreateChooseBranch(boxedCompleted, cts, scheduler, 1, source2, action2);
2085 branchTasks[2] = CreateChooseBranch(boxedCompleted, cts, scheduler, 2, source3, action3);
2088 // Asynchronously wait for all branches to complete, then complete
2089 // a task to be returned to the caller.
2090 var result = new TaskCompletionSource<int>();
2091 Task.Factory.ContinueWhenAll(branchTasks, tasks =>
2093 // Process the outcome of all branches. At most one will have completed
2094 // successfully, returning its branch ID. Others may have faulted,
2095 // in which case we need to propagate their exceptions, regardless
2096 // of whether a branch completed successfully. Others may have been
2097 // canceled (or run but found they were not needed), and those
2099 List<Exception> exceptions = null;
2100 int successfulBranchId = -1;
2101 foreach (Task<int> task in tasks)
2103 switch (task.Status)
2105 case TaskStatus.Faulted:
2106 Common.AddException(ref exceptions, task.Exception, unwrapInnerExceptions: true);
2108 case TaskStatus.RanToCompletion:
2109 int resultBranchId = task.Result;
2110 if (resultBranchId >= 0)
2112 Debug.Assert(resultBranchId < tasks.Length, "Expected a valid branch ID");
2113 Debug.Assert(successfulBranchId == -1, "There should be at most one successful branch.");
2114 successfulBranchId = resultBranchId;
2116 else Debug.Assert(resultBranchId == -1, "Expected -1 as a signal of a non-successful branch");
2121 // If we found any exceptions, fault the Choose task. Otherwise, if any branch completed
2122 // successfully, store its result, or if cancellation was request
2123 if (exceptions != null)
2125 result.TrySetException(exceptions);
2127 else if (successfulBranchId >= 0)
2129 result.TrySetResult(successfulBranchId);
2133 result.TrySetCanceled();
2136 // By now we know that all of the tasks have completed, so there
2137 // can't be any more use of the CancellationTokenSource.
2139 }, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
2143 /// <summary>Creates a target for a branch of a Choose.</summary>
2144 /// <typeparam name="T">Specifies the type of data coming through this branch.</typeparam>
2145 /// <param name="boxedCompleted">A strong box around the completed Task from any target. Also sync obj for access to the targets.</param>
2146 /// <param name="cts">The CancellationTokenSource used to issue tear down / cancellation requests.</param>
2147 /// <param name="scheduler">The TaskScheduler on which to scheduler work.</param>
2148 /// <param name="branchId">The ID of this branch, used to complete the resultTask.</param>
2149 /// <param name="source">The source with which this branch is associated.</param>
2150 /// <param name="action">The action to run for a single element received from the source.</param>
2151 /// <returns>A task representing the branch.</returns>
2152 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2153 private static Task<int> CreateChooseBranch<T>(
2154 StrongBox<Task> boxedCompleted, CancellationTokenSource cts,
2155 TaskScheduler scheduler,
2156 int branchId, ISourceBlock<T> source, Action<T> action)
2158 // If the cancellation token is already canceled, there is no need to create and link a target.
2159 // Instead, directly return a canceled task.
2160 if (cts.IsCancellationRequested)
2161 return Common.CreateTaskFromCancellation<int>(cts.Token);
2163 // Proceed with creating and linking a hidden target. Also get the source's completion task,
2164 // as we need it to know when the source completes. Both of these operations
2165 // could throw an exception if the block is faulty.
2166 var target = new ChooseTarget<T>(boxedCompleted, cts.Token);
2170 unlink = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
2172 catch (Exception exc)
2175 return Common.CreateTaskFromException<int>(exc);
2178 // The continuation task below is implicitly capturing the right execution context,
2179 // as CreateChooseBranch is called synchronously from Choose, so we
2180 // don't need to additionally capture and marshal an ExecutionContext.
2182 return target.Task.ContinueWith(completed =>
2186 // If the target ran to completion, i.e. it got a message,
2187 // cancel the other branch(es) and proceed with the user callback.
2188 if (completed.Status == TaskStatus.RanToCompletion)
2190 // Cancel the cts to trigger completion of the other branches.
2193 // Proceed with the user callback.
2194 action(completed.Result);
2196 // Return the ID of our branch to indicate.
2203 // Unlink from the source. This could throw if the block is faulty,
2204 // in which case our branch's task will fault. If this
2205 // does throw, it'll end up propagating instead of the
2206 // original action's exception if there was one.
2209 }, CancellationToken.None, Common.GetContinuationOptions(), scheduler);
2212 /// <summary>Provides a dataflow target used by Choose to receive data from a single source.</summary>
2213 /// <typeparam name="T">Specifies the type of data offered to this target.</typeparam>
2214 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2215 private sealed class ChooseTarget<T> : TaskCompletionSource<T>, ITargetBlock<T>, IDebuggerDisplay
2218 /// Delegate used to invoke the action for a branch when that branch is activated
2219 /// on the fast path.
2221 internal static readonly Func<object, int> s_processBranchFunction = state =>
2223 Tuple<Action<T>, T, int> actionResultBranch = (Tuple<Action<T>, T, int>)state;
2224 actionResultBranch.Item1(actionResultBranch.Item2);
2225 return actionResultBranch.Item3;
2229 /// A wrapper for the task that represents the completed branch of this choice.
2230 /// The wrapper is also the sync object used to protect all choice branch's access to shared state.
2232 private StrongBox<Task> _completed;
2234 /// <summary>Initializes the target.</summary>
2235 /// <param name="completed">The completed wrapper shared between all choice branches.</param>
2236 /// <param name="cancellationToken">The cancellation token used to cancel this target.</param>
2237 internal ChooseTarget(StrongBox<Task> completed, CancellationToken cancellationToken)
2239 Contract.Requires(completed != null, "Requires a shared target to complete.");
2240 _completed = completed;
2242 // Handle async cancellation by canceling the target without storing it into _completed.
2243 // _completed must only be set to a RanToCompletion task for a successful branch.
2244 Common.WireCancellationToComplete(cancellationToken, base.Task,
2247 var thisChooseTarget = (ChooseTarget<T>)state;
2248 lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled();
2252 /// <summary>Called when this choice branch is being offered a message.</summary>
2253 public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
2255 // Validate arguments
2256 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
2257 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
2258 Contract.EndContractBlock();
2262 // If we or another participating choice has already completed, we're done.
2263 if (_completed.Value != null || base.Task.IsCompleted) return DataflowMessageStatus.DecliningPermanently;
2265 // Consume the message from the source if necessary
2266 if (consumeToAccept)
2269 messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
2270 if (!consumed) return DataflowMessageStatus.NotAvailable;
2273 // Store the result and signal our success
2274 TrySetResult(messageValue);
2275 _completed.Value = Task;
2276 return DataflowMessageStatus.Accepted;
2280 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
2281 void IDataflowBlock.Complete()
2283 lock (_completed) TrySetCanceled();
2286 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
2287 void IDataflowBlock.Fault(Exception exception) { ((IDataflowBlock)this).Complete(); }
2289 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
2290 Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
2292 /// <summary>The data to display in the debugger display attribute.</summary>
2293 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2294 private object DebuggerDisplayContent
2298 return string.Format("{0} IsCompleted={1}",
2299 Common.GetNameForDebugger(this), base.Task.IsCompleted);
2302 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2303 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2308 #region AsObservable
2309 /// <summary>Creates a new <see cref="System.IObservable{TOutput}"/> abstraction over the <see cref="ISourceBlock{TOutput}"/>.</summary>
2310 /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
2311 /// <param name="source">The source to wrap.</param>
2312 /// <returns>An IObservable{TOutput} that enables observers to be subscribed to the source.</returns>
2313 /// <exception cref="System.ArgumentNullException">The <paramref name="source"/> is null (Nothing in Visual Basic).</exception>
2314 public static IObservable<TOutput> AsObservable<TOutput>(this ISourceBlock<TOutput> source)
2316 if (source == null) throw new ArgumentNullException("source");
2317 Contract.EndContractBlock();
2318 return SourceObservable<TOutput>.From(source);
2321 /// <summary>Cached options for non-greedy processing.</summary>
2322 private static readonly ExecutionDataflowBlockOptions _nonGreedyExecutionOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
2324 /// <summary>Provides an IObservable veneer over a source block.</summary>
2325 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2326 [DebuggerTypeProxy(typeof(SourceObservable<>.DebugView))]
2327 private sealed class SourceObservable<TOutput> : IObservable<TOutput>, IDebuggerDisplay
2329 /// <summary>The table that maps source to cached observable.</summary>
2331 /// ConditionalWeakTable doesn't do the initialization under a lock, just the publication.
2332 /// This means that if there's a race to create two observables off the same source, we could end
2333 /// up instantiating multiple SourceObservable instances, of which only one will be published.
2334 /// Worst case, we end up with a few additional continuations off of the source's completion task.
2336 private static readonly ConditionalWeakTable<ISourceBlock<TOutput>, SourceObservable<TOutput>> _table =
2337 new ConditionalWeakTable<ISourceBlock<TOutput>, SourceObservable<TOutput>>();
2339 /// <summary>Gets an observable to represent the source block.</summary>
2340 /// <param name="source">The source.</param>
2341 /// <returns>The observable.</returns>
2342 internal static IObservable<TOutput> From(ISourceBlock<TOutput> source)
2344 Contract.Requires(source != null, "Requires a source for which to retrieve the observable.");
2345 return _table.GetValue(source, s => new SourceObservable<TOutput>(s));
2348 /// <summary>Object used to synchronize all subscriptions, unsubscriptions, and propagations.</summary>
2349 private readonly object _SubscriptionLock = new object();
2350 /// <summary>The wrapped source.</summary>
2351 private readonly ISourceBlock<TOutput> _source;
2353 /// The current target. We use the same target until the number of subscribers
2354 /// drops to 0, at which point we substitute in a new target.
2356 private ObserversState _observersState;
2358 /// <summary>Initializes the SourceObservable.</summary>
2359 /// <param name="source">The source to wrap.</param>
2360 internal SourceObservable(ISourceBlock<TOutput> source)
2362 Contract.Requires(source != null, "The observable requires a source to wrap.");
2364 _observersState = new ObserversState(this);
2367 /// <summary>Gets any exceptions from the source block.</summary>
2368 /// <returns>The aggregate exception of all errors, or null if everything completed successfully.</returns>
2369 private AggregateException GetCompletionError()
2371 Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(_source);
2372 return sourceCompletionTask != null && sourceCompletionTask.IsFaulted ?
2373 sourceCompletionTask.Exception : null;
2376 /// <summary>Subscribes the observer to the source.</summary>
2377 /// <param name="observer">the observer to subscribe.</param>
2378 /// <returns>An IDisposable that may be used to unsubscribe the source.</returns>
2379 [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope")]
2380 IDisposable IObservable<TOutput>.Subscribe(IObserver<TOutput> observer)
2382 // Validate arguments
2383 if (observer == null) throw new ArgumentNullException("observer");
2384 Contract.EndContractBlock();
2385 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: false);
2387 Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(_source);
2389 // Synchronize all observers for this source.
2390 Exception error = null;
2391 lock (_SubscriptionLock)
2393 // Fast path for if everything is already done. We need to ensure that both
2394 // the source is complete and that the target has finished propagating data to all observers.
2395 // If there was an error, we grab it here and then we'll complete the observer
2396 // outside of the lock.
2397 if (sourceCompletionTask != null && sourceCompletionTask.IsCompleted &&
2398 _observersState.Target.Completion.IsCompleted)
2400 error = GetCompletionError();
2402 // Otherwise, we need to subscribe this observer.
2405 // Hook up the observer. If this is the first observer, link the source to the target.
2406 _observersState.Observers = _observersState.Observers.Add(observer);
2407 if (_observersState.Observers.Count == 1)
2409 Debug.Assert(_observersState.Unlinker == null, "The source should not be linked to the target.");
2410 _observersState.Unlinker = _source.LinkTo(_observersState.Target);
2411 if (_observersState.Unlinker == null)
2413 _observersState.Observers = ImmutableList<IObserver<TOutput>>.Empty;
2418 // Return a disposable that will unlink this observer, and if it's the last
2419 // observer for the source, shut off the pipe to observers.
2420 return Disposables.Create((s, o) => s.Unsubscribe(o), this, observer);
2424 // Complete the observer.
2425 if (error != null) observer.OnError(error);
2426 else observer.OnCompleted();
2427 return Disposables.Nop;
2430 /// <summary>Unsubscribes the observer.</summary>
2431 /// <param name="observer">The observer being unsubscribed.</param>
2432 private void Unsubscribe(IObserver<TOutput> observer)
2434 Contract.Requires(observer != null, "Expected an observer.");
2435 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: false);
2437 lock (_SubscriptionLock)
2439 ObserversState currentState = _observersState;
2440 Debug.Assert(currentState != null, "Observer state should never be null.");
2442 // If the observer was already unsubscribed (or is otherwise no longer present in our list), bail.
2443 if (!currentState.Observers.Contains(observer)) return;
2445 // If this is the last observer being removed, reset to be ready for future subscribers.
2446 if (currentState.Observers.Count == 1)
2448 ResetObserverState();
2450 // Otherwise, just remove the observer. Note that we don't remove the observer
2451 // from the current target if this is the last observer. This is done in case the target
2452 // has already taken data from the source: we want that data to end up somewhere,
2453 // and we can't put it back in the source, so we ensure we send it along to the observer.
2456 currentState.Observers = currentState.Observers.Remove(observer);
2461 /// <summary>Resets the observer state to the original, inactive state.</summary>
2462 /// <returns>The list of active observers prior to the reset.</returns>
2463 private ImmutableList<IObserver<TOutput>> ResetObserverState()
2465 Common.ContractAssertMonitorStatus(_SubscriptionLock, held: true);
2467 ObserversState currentState = _observersState;
2468 Debug.Assert(currentState != null, "Observer state should never be null.");
2469 Debug.Assert(currentState.Unlinker != null, "The target should be linked.");
2470 Debug.Assert(currentState.Canceler != null, "The target should have set up continuations.");
2472 // Replace the target with a clean one, unlink and cancel, and return the previous set of observers
2473 ImmutableList<IObserver<TOutput>> currentObservers = currentState.Observers;
2474 _observersState = new ObserversState(this);
2475 currentState.Unlinker.Dispose();
2476 currentState.Canceler.Cancel();
2477 return currentObservers;
2480 /// <summary>The data to display in the debugger display attribute.</summary>
2481 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2482 private object DebuggerDisplayContent
2486 var displaySource = _source as IDebuggerDisplay;
2487 return string.Format("Observers={0}, Block=\"{1}\"",
2488 _observersState.Observers.Count,
2489 displaySource != null ? displaySource.Content : _source);
2492 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2493 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2495 /// <summary>Provides a debugger type proxy for the observable.</summary>
2496 private sealed class DebugView
2498 /// <summary>The observable being debugged.</summary>
2499 private readonly SourceObservable<TOutput> _observable;
2501 /// <summary>Initializes the debug view.</summary>
2502 /// <param name="observable">The target being debugged.</param>
2503 public DebugView(SourceObservable<TOutput> observable)
2505 Contract.Requires(observable != null, "Need a block with which to construct the debug view.");
2506 _observable = observable;
2509 /// <summary>Gets an enumerable of the observers.</summary>
2510 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
2511 public IObserver<TOutput>[] Observers { get { return _observable._observersState.Observers.ToArray(); } }
2514 /// <summary>State associated with the current target for propagating data to observers.</summary>
2515 [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
2516 private sealed class ObserversState
2518 /// <summary>The owning SourceObservable.</summary>
2519 internal readonly SourceObservable<TOutput> Observable;
2520 /// <summary>The ActionBlock that consumes data from a source and offers it to targets.</summary>
2521 internal readonly ActionBlock<TOutput> Target;
2522 /// <summary>Used to cancel continuations when they're no longer necessary.</summary>
2523 internal readonly CancellationTokenSource Canceler = new CancellationTokenSource();
2525 /// A list of the observers currently registered with this target. The list is immutable
2526 /// to enable iteration through the list while the set of observers may be changing.
2528 internal ImmutableList<IObserver<TOutput>> Observers = ImmutableList<IObserver<TOutput>>.Empty;
2529 /// <summary>Used to unlink the source from this target when the last observer is unsubscribed.</summary>
2530 internal IDisposable Unlinker;
2532 /// Temporary list to keep track of SendAsync tasks to TargetObservers with back pressure.
2533 /// This field gets instantiated on demand. It gets populated and cleared within an offering cycle.
2535 private List<Task<bool>> _tempSendAsyncTaskList;
2537 /// <summary>Initializes the target instance.</summary>
2538 /// <param name="observable">The owning observable.</param>
2539 internal ObserversState(SourceObservable<TOutput> observable)
2541 Contract.Requires(observable != null, "Observe state must be mapped to a source observable.");
2543 // Set up the target block
2544 Observable = observable;
2545 Target = new ActionBlock<TOutput>((Func<TOutput, Task>)ProcessItemAsync, DataflowBlock._nonGreedyExecutionOptions);
2547 // If the target block fails due to an unexpected exception (e.g. it calls back to the source and the source throws an error),
2548 // we fault currently registered observers and reset the observable.
2549 Target.Completion.ContinueWith(
2550 (t, state) => ((ObserversState)state).NotifyObserversOfCompletion(t.Exception), this,
2551 CancellationToken.None,
2552 Common.GetContinuationOptions(TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
2553 TaskScheduler.Default);
2555 // When the source completes, complete the target. Then when the target completes,
2556 // send completion messages to any observers still registered.
2557 Task sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(Observable._source);
2558 if (sourceCompletionTask != null)
2560 sourceCompletionTask.ContinueWith((_1, state1) =>
2562 var ti = (ObserversState)state1;
2563 ti.Target.Complete();
2564 ti.Target.Completion.ContinueWith(
2565 (_2, state2) => ((ObserversState)state2).NotifyObserversOfCompletion(), state1,
2566 CancellationToken.None,
2567 Common.GetContinuationOptions(TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
2568 TaskScheduler.Default);
2569 }, this, Canceler.Token, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
2573 /// <summary>Forwards an item to all currently subscribed observers.</summary>
2574 /// <param name="item">The item to forward.</param>
2575 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2576 private Task ProcessItemAsync(TOutput item)
2578 Common.ContractAssertMonitorStatus(Observable._SubscriptionLock, held: false);
2580 ImmutableList<IObserver<TOutput>> currentObservers;
2581 lock (Observable._SubscriptionLock) currentObservers = Observers;
2584 foreach (IObserver<TOutput> observer in currentObservers)
2586 // If the observer is our own TargetObserver, we SendAsync() to it
2587 // rather than going through IObserver.OnNext() which allows us to
2588 // continue offering to the remaining observers without blocking.
2589 var targetObserver = observer as TargetObserver<TOutput>;
2590 if (targetObserver != null)
2592 Task<bool> sendAsyncTask = targetObserver.SendAsyncToTarget(item);
2593 if (sendAsyncTask.Status != TaskStatus.RanToCompletion)
2595 // Ensure the SendAsyncTaskList is instantiated
2596 if (_tempSendAsyncTaskList == null) _tempSendAsyncTaskList = new List<Task<bool>>();
2598 // Add the task to the list
2599 _tempSendAsyncTaskList.Add(sendAsyncTask);
2604 observer.OnNext(item);
2608 // If there are SendAsync tasks to wait on...
2609 if (_tempSendAsyncTaskList != null && _tempSendAsyncTaskList.Count > 0)
2611 // Consolidate all SendAsync tasks into one
2612 Task<bool[]> allSendAsyncTasksConsolidated = Task.WhenAll(_tempSendAsyncTaskList);
2614 // Clear the temp SendAsync task list
2615 _tempSendAsyncTaskList.Clear();
2617 // Return the consolidated task
2618 return allSendAsyncTasksConsolidated;
2621 catch (Exception exc)
2623 // Return a faulted task
2624 return Common.CreateTaskFromException<VoidResult>(exc);
2627 // All observers accepted normally.
2628 // Return a completed task.
2629 return Common.CompletedTaskWithTrueResult;
2632 /// <summary>Notifies all currently registered observers that they should complete.</summary>
2633 /// <param name="targetException">
2634 /// Non-null when an unexpected exception occurs during processing. Faults
2635 /// all subscribed observers and resets the observable back to its original condition.
2637 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
2638 private void NotifyObserversOfCompletion(Exception targetException = null)
2640 Contract.Requires(Target.Completion.IsCompleted, "The target must have already completed in order to notify of completion.");
2641 Common.ContractAssertMonitorStatus(Observable._SubscriptionLock, held: false);
2643 // Send completion notification to all observers.
2644 ImmutableList<IObserver<TOutput>> currentObservers;
2645 lock (Observable._SubscriptionLock)
2647 // Get the currently registered set of observers. Then, if we're being called due to the target
2648 // block failing from an unexpected exception, reset the observer state so that subsequent
2649 // subscribed observers will get a new target block. Finally clear out our observer list.
2650 currentObservers = Observers;
2651 if (targetException != null) Observable.ResetObserverState();
2652 Observers = ImmutableList<IObserver<TOutput>>.Empty;
2655 // If there are any observers to complete...
2656 if (currentObservers.Count > 0)
2658 // Determine if we should fault or complete the observers
2659 Exception error = targetException ?? Observable.GetCompletionError();
2665 foreach (IObserver<TOutput> observer in currentObservers) observer.OnError(error);
2669 foreach (IObserver<TOutput> observer in currentObservers) observer.OnCompleted();
2672 catch (Exception exc)
2674 // If an observer throws an exception at this point (which it shouldn't do),
2675 // we have little recourse but to let that exception propagate. Since allowing it to
2676 // propagate here would just result in it getting eaten by the owning task,
2677 // we instead have it propagate on the thread pool.
2678 Common.ThrowAsync(exc);
2687 /// <summary>Creates a new <see cref="System.IObserver{TInput}"/> abstraction over the <see cref="ITargetBlock{TInput}"/>.</summary>
2688 /// <typeparam name="TInput">Specifies the type of input accepted by the target block.</typeparam>
2689 /// <param name="target">The target to wrap.</param>
2690 /// <returns>An observer that wraps the target block.</returns>
2691 public static IObserver<TInput> AsObserver<TInput>(this ITargetBlock<TInput> target)
2693 if (target == null) throw new ArgumentNullException("target");
2694 Contract.EndContractBlock();
2695 return new TargetObserver<TInput>(target);
2698 /// <summary>Provides an observer wrapper for a target block.</summary>
2699 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
2700 private sealed class TargetObserver<TInput> : IObserver<TInput>, IDebuggerDisplay
2702 /// <summary>The wrapped target.</summary>
2703 private readonly ITargetBlock<TInput> _target;
2705 /// <summary>Initializes the observer.</summary>
2706 /// <param name="target">The target to wrap.</param>
2707 internal TargetObserver(ITargetBlock<TInput> target)
2709 Contract.Requires(target != null, "A target to observe is required.");
2713 /// <summary>Sends the value to the observer.</summary>
2714 /// <param name="value">The value to send.</param>
2715 void IObserver<TInput>.OnNext(TInput value)
2717 // Send the value asynchronously...
2718 Task<bool> task = SendAsyncToTarget(value);
2720 // And block until it's received.
2721 task.GetAwaiter().GetResult(); // propagate original (non-aggregated) exception
2724 /// <summary>Completes the target.</summary>
2725 void IObserver<TInput>.OnCompleted()
2730 /// <summary>Forwards the error to the target.</summary>
2731 /// <param name="error">The exception to forward.</param>
2732 void IObserver<TInput>.OnError(Exception error)
2734 _target.Fault(error);
2737 /// <summary>Sends a value to the underlying target asynchronously.</summary>
2738 /// <param name="value">The value to send.</param>
2739 /// <returns>A Task{bool} to wait on.</returns>
2740 internal Task<bool> SendAsyncToTarget(TInput value)
2742 return _target.SendAsync(value);
2745 /// <summary>The data to display in the debugger display attribute.</summary>
2746 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
2747 private object DebuggerDisplayContent
2751 var displayTarget = _target as IDebuggerDisplay;
2752 return string.Format("Block=\"{0}\"",
2753 displayTarget != null ? displayTarget.Content : _target);
2756 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
2757 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
2763 /// Gets a target block that synchronously accepts all messages offered to it and drops them.
2765 /// <typeparam name="TInput">The type of the messages this block can accept.</typeparam>
2766 /// <returns>A <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/> that accepts and subsequently drops all offered messages.</returns>
2767 public static ITargetBlock<TInput> NullTarget<TInput>()
2769 return new NullTargetBlock<TInput>();
2773 /// Target block that synchronously accepts all messages offered to it and drops them.
2775 /// <typeparam name="TInput">The type of the messages this block can accept.</typeparam>
2776 private class NullTargetBlock<TInput> : ITargetBlock<TInput>
2778 private Task _completion;
2780 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
2781 DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
2783 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
2784 Contract.EndContractBlock();
2786 // If the source requires an explicit synchronous consumption, do it
2787 if (consumeToAccept)
2789 if (source == null) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
2790 bool messageConsumed;
2792 // If the source throws during this call, let the exception propagate back to the source
2793 source.ConsumeMessage(messageHeader, this, out messageConsumed);
2794 if (!messageConsumed) return DataflowMessageStatus.NotAvailable;
2797 // Always tell the source the message has been accepted
2798 return DataflowMessageStatus.Accepted;
2801 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
2802 void IDataflowBlock.Complete() { } // No-op
2803 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
2804 void IDataflowBlock.Fault(Exception exception) { } // No-op
2805 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
2806 Task IDataflowBlock.Completion
2808 get { return LazyInitializer.EnsureInitialized(ref _completion, () => new TaskCompletionSource<VoidResult>().Task); }