1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // A target block that executes an action for each message.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Runtime.CompilerServices;
18 using System.Threading.Tasks.Dataflow.Internal;
20 namespace System.Threading.Tasks.Dataflow
22 /// <summary>Provides a dataflow block that invokes a provided <see cref="System.Action{T}"/> delegate for every data element received.</summary>
23 /// <typeparam name="TInput">Specifies the type of data operated on by this <see cref="ActionBlock{T}"/>.</typeparam>
24 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
25 [DebuggerTypeProxy(typeof(ActionBlock<>.DebugView))]
26 public sealed class ActionBlock<TInput> : ITargetBlock<TInput>, IDebuggerDisplay
28 /// <summary>The core implementation of this message block when in default mode.</summary>
29 private readonly TargetCore<TInput> _defaultTarget;
30 /// <summary>The core implementation of this message block when in SPSC mode.</summary>
31 private readonly SpscTargetCore<TInput> _spscTarget;
33 /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/>.</summary>
34 /// <param name="action">The action to invoke with each data element received.</param>
35 /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
36 public ActionBlock(Action<TInput> action) :
37 this((Delegate)action, ExecutionDataflowBlockOptions.Default)
40 /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
41 /// <param name="action">The action to invoke with each data element received.</param>
42 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
43 /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
44 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
45 public ActionBlock(Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
46 this((Delegate)action, dataflowBlockOptions)
49 /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/>.</summary>
50 /// <param name="action">The action to invoke with each data element received.</param>
51 /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
52 public ActionBlock(Func<TInput, Task> action) :
53 this((Delegate)action, ExecutionDataflowBlockOptions.Default)
56 /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
57 /// <param name="action">The action to invoke with each data element received.</param>
58 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
59 /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
60 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
61 public ActionBlock(Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
62 this((Delegate)action, dataflowBlockOptions)
65 /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified delegate and options.</summary>
66 /// <param name="action">The action to invoke with each data element received.</param>
67 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
68 /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
69 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
70 private ActionBlock(Delegate action, ExecutionDataflowBlockOptions dataflowBlockOptions)
73 if (action == null) throw new ArgumentNullException("action");
74 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
75 Contract.Ensures((_spscTarget != null) ^ (_defaultTarget != null), "One and only one of the two targets must be non-null after construction");
76 Contract.EndContractBlock();
78 // Ensure we have options that can't be changed by the caller
79 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
81 // Based on the mode, initialize the target. If the user specifies SingleProducerConstrained,
82 // we'll try to employ an optimized mode under a limited set of circumstances.
83 var syncAction = action as Action<TInput>;
84 if (syncAction != null &&
85 dataflowBlockOptions.SingleProducerConstrained &&
86 dataflowBlockOptions.MaxDegreeOfParallelism == 1 &&
87 !dataflowBlockOptions.CancellationToken.CanBeCanceled &&
88 dataflowBlockOptions.BoundedCapacity == DataflowBlockOptions.Unbounded)
90 // Initialize the SPSC fast target to handle the bulk of the processing.
91 // The SpscTargetCore is only supported when BoundedCapacity, CancellationToken,
92 // and MaxDOP are all their default values. It's also only supported for sync
93 // delegates and not for async delegates.
94 _spscTarget = new SpscTargetCore<TInput>(this, syncAction, dataflowBlockOptions);
98 // Initialize the TargetCore which handles the bulk of the processing.
99 // The default target core can handle all options and delegate flavors.
101 if (syncAction != null) // sync
103 _defaultTarget = new TargetCore<TInput>(this,
104 messageWithId => ProcessMessage(syncAction, messageWithId),
105 null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion);
109 var asyncAction = action as Func<TInput, Task>;
110 Debug.Assert(asyncAction != null, "action is of incorrect delegate type");
111 _defaultTarget = new TargetCore<TInput>(this,
112 messageWithId => ProcessMessageWithTask(asyncAction, messageWithId),
113 null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion | TargetCoreOptions.UsesAsyncCompletion);
116 // Handle async cancellation requests by declining on the target
117 Common.WireCancellationToComplete(
118 dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
121 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
122 if (etwLog.IsEnabled())
124 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
129 /// <summary>Processes the message with a user-provided action.</summary>
130 /// <param name="action">The action to use to process the message.</param>
131 /// <param name="messageWithId">The message to be processed.</param>
132 private void ProcessMessage(Action<TInput> action, KeyValuePair<TInput, long> messageWithId)
136 action(messageWithId.Key);
138 catch (Exception exc)
140 // If this exception represents cancellation, swallow it rather than shutting down the block.
141 if (!Common.IsCooperativeCancellation(exc)) throw;
145 // We're done synchronously processing an element, so reduce the bounding count
146 // that was incrementing when this element was enqueued.
147 if (_defaultTarget.IsBounded) _defaultTarget.ChangeBoundingCount(-1);
151 /// <summary>Processes the message with a user-provided action that returns a task.</summary>
152 /// <param name="action">The action to use to process the message.</param>
153 /// <param name="messageWithId">The message to be processed.</param>
154 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
155 private void ProcessMessageWithTask(Func<TInput, Task> action, KeyValuePair<TInput, long> messageWithId)
157 Contract.Requires(action != null, "action needed for processing");
159 // Run the action to get the task that represents the operation's completion
161 Exception caughtException = null;
164 task = action(messageWithId.Key);
166 catch (Exception exc) { caughtException = exc; }
168 // If no task is available, we're done.
171 // If we didn't get a task because an exception occurred,
172 // store it (if the exception was cancellation, just ignore it).
173 if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
175 Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
176 _defaultTarget.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
179 // Signal that we're done this async operation.
180 _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
183 else if (task.IsCompleted)
185 AsyncCompleteProcessMessageWithTask(task);
189 // Otherwise, join with the asynchronous operation when it completes.
190 task.ContinueWith((completed, state) =>
192 ((ActionBlock<TInput>)state).AsyncCompleteProcessMessageWithTask(completed);
193 }, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
197 /// <summary>Completes the processing of an asynchronous message.</summary>
198 /// <param name="completed">The completed task.</param>
199 private void AsyncCompleteProcessMessageWithTask(Task completed)
201 Contract.Requires(completed != null, "Need completed task for processing");
202 Contract.Requires(completed.IsCompleted, "The task to be processed must be completed by now.");
204 // If the task faulted, store its errors. We must add the exception before declining
205 // and signaling completion, as the exception is part of the operation, and the completion conditions
207 if (completed.IsFaulted)
209 _defaultTarget.Complete(completed.Exception, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
212 // Regardless of faults, note that we're done processing. There are
213 // no outputs to keep track of for action block, so we always decrement
214 // the bounding count here (the callee will handle checking whether
215 // we're actually in a bounded mode).
216 _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
219 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
220 public void Complete()
222 if (_defaultTarget != null)
224 _defaultTarget.Complete(exception: null, dropPendingMessages: false);
228 _spscTarget.Complete(exception: null);
232 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
233 void IDataflowBlock.Fault(Exception exception)
235 if (exception == null) throw new ArgumentNullException("exception");
236 Contract.EndContractBlock();
238 if (_defaultTarget != null)
240 _defaultTarget.Complete(exception, dropPendingMessages: true);
244 _spscTarget.Complete(exception);
248 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
249 public Task Completion
251 get { return _defaultTarget != null ? _defaultTarget.Completion : _spscTarget.Completion; }
254 /// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
255 /// <param name="item">The item being offered to the target.</param>
256 /// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
258 /// This method will return once the target block has decided to accept or decline the item,
259 /// but unless otherwise dictated by special semantics of the target block, it does not wait
260 /// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
261 /// will return from Post as soon as it has stored the posted item into its input queue). From the perspective
262 /// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages,
263 /// or for blocks that may do more processing in their Post implementation, consider using
264 /// <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>,
265 /// which will return immediately and will enable the target to postpone the posted message and later consume it
266 /// after SendAsync returns.
268 [MethodImpl(MethodImplOptions.AggressiveInlining)]
269 public bool Post(TInput item)
271 // Even though this method is available with the exact same functionality as an extension method
272 // on ITargetBlock, using that extension method goes through an interface call on ITargetBlock,
273 // which for very high-throughput scenarios shows up as noticeable overhead on certain architectures.
274 // We can eliminate that call for direct ActionBlock usage by providing the same method as an instance method.
276 return _defaultTarget != null ?
277 _defaultTarget.OfferMessage(Common.SingleMessageHeader, item, null, false) == DataflowMessageStatus.Accepted :
278 _spscTarget.Post(item);
281 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
282 DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
284 return _defaultTarget != null ?
285 _defaultTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept) :
286 _spscTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
289 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
290 public int InputCount
292 get { return _defaultTarget != null ? _defaultTarget.InputCount : _spscTarget.InputCount; }
295 /// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger.</summary>
296 private int InputCountForDebugger
298 get { return _defaultTarget != null ? _defaultTarget.GetDebuggingInformation().InputCount : _spscTarget.InputCount; }
301 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
302 public override string ToString()
304 return Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions);
307 /// <summary>The data to display in the debugger display attribute.</summary>
308 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
309 private object DebuggerDisplayContent
313 return string.Format("{0}, InputCount={1}",
314 Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions),
315 InputCountForDebugger);
318 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
319 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
321 /// <summary>Provides a debugger type proxy for the Call.</summary>
322 private sealed class DebugView
324 /// <summary>The action block being viewed.</summary>
325 private readonly ActionBlock<TInput> _actionBlock;
326 /// <summary>The action block's default target being viewed.</summary>
327 private readonly TargetCore<TInput>.DebuggingInformation _defaultDebugInfo;
328 /// <summary>The action block's SPSC target being viewed.</summary>
329 private readonly SpscTargetCore<TInput>.DebuggingInformation _spscDebugInfo;
331 /// <summary>Initializes the debug view.</summary>
332 /// <param name="actionBlock">The target being debugged.</param>
333 public DebugView(ActionBlock<TInput> actionBlock)
335 Contract.Requires(actionBlock != null, "Need a block with which to construct the debug view.");
336 _actionBlock = actionBlock;
337 if (_actionBlock._defaultTarget != null)
339 _defaultDebugInfo = actionBlock._defaultTarget.GetDebuggingInformation();
343 _spscDebugInfo = actionBlock._spscTarget.GetDebuggingInformation();
347 /// <summary>Gets the messages waiting to be processed.</summary>
348 public IEnumerable<TInput> InputQueue
350 get { return _defaultDebugInfo != null ? _defaultDebugInfo.InputQueue : _spscDebugInfo.InputQueue; }
352 /// <summary>Gets any postponed messages.</summary>
353 public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
355 get { return _defaultDebugInfo != null ? _defaultDebugInfo.PostponedMessages : null; }
358 /// <summary>Gets the number of outstanding input operations.</summary>
359 public Int32 CurrentDegreeOfParallelism
361 get { return _defaultDebugInfo != null ? _defaultDebugInfo.CurrentDegreeOfParallelism : _spscDebugInfo.CurrentDegreeOfParallelism; }
364 /// <summary>Gets the ExecutionDataflowBlockOptions used to configure this block.</summary>
365 public ExecutionDataflowBlockOptions DataflowBlockOptions
367 get { return _defaultDebugInfo != null ? _defaultDebugInfo.DataflowBlockOptions : _spscDebugInfo.DataflowBlockOptions; }
369 /// <summary>Gets whether the block is declining further messages.</summary>
370 public bool IsDecliningPermanently
372 get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsDecliningPermanently : _spscDebugInfo.IsDecliningPermanently; }
374 /// <summary>Gets whether the block is completed.</summary>
375 public bool IsCompleted
377 get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsCompleted : _spscDebugInfo.IsCompleted; }
379 /// <summary>Gets the block's Id.</summary>
380 public int Id { get { return Common.GetBlockId(_actionBlock); } }