Merge pull request #2236 from akoeplinger/add-dataflow
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Blocks / ActionBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // ActionBlock.cs
7 //
8 //
9 // A target block that executes an action for each message.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Runtime.CompilerServices;
18 using System.Threading.Tasks.Dataflow.Internal;
19
20 namespace System.Threading.Tasks.Dataflow
21 {
22     /// <summary>Provides a dataflow block that invokes a provided <see cref="System.Action{T}"/> delegate for every data element received.</summary>
23     /// <typeparam name="TInput">Specifies the type of data operated on by this <see cref="ActionBlock{T}"/>.</typeparam>
24     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
25     [DebuggerTypeProxy(typeof(ActionBlock<>.DebugView))]
26     public sealed class ActionBlock<TInput> : ITargetBlock<TInput>, IDebuggerDisplay
27     {
28         /// <summary>The core implementation of this message block when in default mode.</summary>
29         private readonly TargetCore<TInput> _defaultTarget;
30         /// <summary>The core implementation of this message block when in SPSC mode.</summary>
31         private readonly SpscTargetCore<TInput> _spscTarget;
32
33         /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/>.</summary>
34         /// <param name="action">The action to invoke with each data element received.</param>
35         /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
36         public ActionBlock(Action<TInput> action) :
37             this((Delegate)action, ExecutionDataflowBlockOptions.Default)
38         { }
39
40         /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
41         /// <param name="action">The action to invoke with each data element received.</param>
42         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
43         /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
44         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
45         public ActionBlock(Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
46             this((Delegate)action, dataflowBlockOptions)
47         { }
48
49         /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/>.</summary>
50         /// <param name="action">The action to invoke with each data element received.</param>
51         /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
52         public ActionBlock(Func<TInput, Task> action) :
53             this((Delegate)action, ExecutionDataflowBlockOptions.Default)
54         { }
55
56         /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
57         /// <param name="action">The action to invoke with each data element received.</param>
58         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
59         /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
60         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
61         public ActionBlock(Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
62             this((Delegate)action, dataflowBlockOptions)
63         { }
64
65         /// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified delegate and options.</summary>
66         /// <param name="action">The action to invoke with each data element received.</param>
67         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
68         /// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
69         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
70         private ActionBlock(Delegate action, ExecutionDataflowBlockOptions dataflowBlockOptions)
71         {
72             // Validate arguments
73             if (action == null) throw new ArgumentNullException("action");
74             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
75             Contract.Ensures((_spscTarget != null) ^ (_defaultTarget != null), "One and only one of the two targets must be non-null after construction");
76             Contract.EndContractBlock();
77
78             // Ensure we have options that can't be changed by the caller
79             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
80
81             // Based on the mode, initialize the target.  If the user specifies SingleProducerConstrained,
82             // we'll try to employ an optimized mode under a limited set of circumstances.
83             var syncAction = action as Action<TInput>;
84             if (syncAction != null &&
85                 dataflowBlockOptions.SingleProducerConstrained &&
86                 dataflowBlockOptions.MaxDegreeOfParallelism == 1 &&
87                 !dataflowBlockOptions.CancellationToken.CanBeCanceled &&
88                 dataflowBlockOptions.BoundedCapacity == DataflowBlockOptions.Unbounded)
89             {
90                 // Initialize the SPSC fast target to handle the bulk of the processing.
91                 // The SpscTargetCore is only supported when BoundedCapacity, CancellationToken,
92                 // and MaxDOP are all their default values.  It's also only supported for sync
93                 // delegates and not for async delegates.
94                 _spscTarget = new SpscTargetCore<TInput>(this, syncAction, dataflowBlockOptions);
95             }
96             else
97             {
98                 // Initialize the TargetCore which handles the bulk of the processing.
99                 // The default target core can handle all options and delegate flavors.
100
101                 if (syncAction != null) // sync
102                 {
103                     _defaultTarget = new TargetCore<TInput>(this,
104                         messageWithId => ProcessMessage(syncAction, messageWithId),
105                         null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion);
106                 }
107                 else // async
108                 {
109                     var asyncAction = action as Func<TInput, Task>;
110                     Debug.Assert(asyncAction != null, "action is of incorrect delegate type");
111                     _defaultTarget = new TargetCore<TInput>(this,
112                         messageWithId => ProcessMessageWithTask(asyncAction, messageWithId),
113                         null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion | TargetCoreOptions.UsesAsyncCompletion);
114                 }
115
116                 // Handle async cancellation requests by declining on the target
117                 Common.WireCancellationToComplete(
118                     dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
119             }
120 #if FEATURE_TRACING
121             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
122             if (etwLog.IsEnabled())
123             {
124                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
125             }
126 #endif
127         }
128
129         /// <summary>Processes the message with a user-provided action.</summary>
130         /// <param name="action">The action to use to process the message.</param>
131         /// <param name="messageWithId">The message to be processed.</param>
132         private void ProcessMessage(Action<TInput> action, KeyValuePair<TInput, long> messageWithId)
133         {
134             try
135             {
136                 action(messageWithId.Key);
137             }
138             catch (Exception exc)
139             {
140                 // If this exception represents cancellation, swallow it rather than shutting down the block.
141                 if (!Common.IsCooperativeCancellation(exc)) throw;
142             }
143             finally
144             {
145                 // We're done synchronously processing an element, so reduce the bounding count
146                 // that was incrementing when this element was enqueued.
147                 if (_defaultTarget.IsBounded) _defaultTarget.ChangeBoundingCount(-1);
148             }
149         }
150
151         /// <summary>Processes the message with a user-provided action that returns a task.</summary>
152         /// <param name="action">The action to use to process the message.</param>
153         /// <param name="messageWithId">The message to be processed.</param>
154         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
155         private void ProcessMessageWithTask(Func<TInput, Task> action, KeyValuePair<TInput, long> messageWithId)
156         {
157             Contract.Requires(action != null, "action needed for processing");
158
159             // Run the action to get the task that represents the operation's completion
160             Task task = null;
161             Exception caughtException = null;
162             try
163             {
164                 task = action(messageWithId.Key);
165             }
166             catch (Exception exc) { caughtException = exc; }
167
168             // If no task is available, we're done.
169             if (task == null)
170             {
171                 // If we didn't get a task because an exception occurred,
172                 // store it (if the exception was cancellation, just ignore it).
173                 if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
174                 {
175                     Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
176                     _defaultTarget.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
177                 }
178
179                 // Signal that we're done this async operation.
180                 _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
181                 return;
182             }
183             else if (task.IsCompleted)
184             {
185                 AsyncCompleteProcessMessageWithTask(task);
186             }
187             else
188             {
189                 // Otherwise, join with the asynchronous operation when it completes.
190                 task.ContinueWith((completed, state) =>
191                 {
192                     ((ActionBlock<TInput>)state).AsyncCompleteProcessMessageWithTask(completed);
193                 }, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
194             }
195         }
196
197         /// <summary>Completes the processing of an asynchronous message.</summary>
198         /// <param name="completed">The completed task.</param>
199         private void AsyncCompleteProcessMessageWithTask(Task completed)
200         {
201             Contract.Requires(completed != null, "Need completed task for processing");
202             Contract.Requires(completed.IsCompleted, "The task to be processed must be completed by now.");
203
204             // If the task faulted, store its errors. We must add the exception before declining
205             // and signaling completion, as the exception is part of the operation, and the completion conditions
206             // depend on this.
207             if (completed.IsFaulted)
208             {
209                 _defaultTarget.Complete(completed.Exception, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
210             }
211
212             // Regardless of faults, note that we're done processing.  There are
213             // no outputs to keep track of for action block, so we always decrement 
214             // the bounding count here (the callee will handle checking whether
215             // we're actually in a bounded mode).
216             _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
217         }
218
219         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
220         public void Complete()
221         {
222             if (_defaultTarget != null)
223             {
224                 _defaultTarget.Complete(exception: null, dropPendingMessages: false);
225             }
226             else
227             {
228                 _spscTarget.Complete(exception: null);
229             }
230         }
231
232         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
233         void IDataflowBlock.Fault(Exception exception)
234         {
235             if (exception == null) throw new ArgumentNullException("exception");
236             Contract.EndContractBlock();
237
238             if (_defaultTarget != null)
239             {
240                 _defaultTarget.Complete(exception, dropPendingMessages: true);
241             }
242             else
243             {
244                 _spscTarget.Complete(exception);
245             }
246         }
247
248         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
249         public Task Completion
250         {
251             get { return _defaultTarget != null ? _defaultTarget.Completion : _spscTarget.Completion; }
252         }
253
254         /// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
255         /// <param name="item">The item being offered to the target.</param>
256         /// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
257         /// <remarks>
258         /// This method will return once the target block has decided to accept or decline the item,
259         /// but unless otherwise dictated by special semantics of the target block, it does not wait
260         /// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
261         /// will return from Post as soon as it has stored the posted item into its input queue).  From the perspective
262         /// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages, 
263         /// or for blocks that may do more processing in their Post implementation, consider using
264         ///  <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>, 
265         /// which will return immediately and will enable the target to postpone the posted message and later consume it 
266         /// after SendAsync returns.
267         /// </remarks>
268         [MethodImpl(MethodImplOptions.AggressiveInlining)]
269         public bool Post(TInput item)
270         {
271             // Even though this method is available with the exact same functionality as an extension method
272             // on ITargetBlock, using that extension method goes through an interface call on ITargetBlock,
273             // which for very high-throughput scenarios shows up as noticeable overhead on certain architectures.  
274             // We can eliminate that call for direct ActionBlock usage by providing the same method as an instance method.
275
276             return _defaultTarget != null ?
277                 _defaultTarget.OfferMessage(Common.SingleMessageHeader, item, null, false) == DataflowMessageStatus.Accepted :
278                 _spscTarget.Post(item);
279         }
280
281         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
282         DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
283         {
284             return _defaultTarget != null ?
285                 _defaultTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept) :
286                 _spscTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
287         }
288
289         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
290         public int InputCount
291         {
292             get { return _defaultTarget != null ? _defaultTarget.InputCount : _spscTarget.InputCount; }
293         }
294
295         /// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger.</summary>
296         private int InputCountForDebugger
297         {
298             get { return _defaultTarget != null ? _defaultTarget.GetDebuggingInformation().InputCount : _spscTarget.InputCount; }
299         }
300
301         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
302         public override string ToString()
303         {
304             return Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions);
305         }
306
307         /// <summary>The data to display in the debugger display attribute.</summary>
308         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
309         private object DebuggerDisplayContent
310         {
311             get
312             {
313                 return string.Format("{0}, InputCount={1}",
314                     Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions),
315                     InputCountForDebugger);
316             }
317         }
318         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
319         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
320
321         /// <summary>Provides a debugger type proxy for the Call.</summary>
322         private sealed class DebugView
323         {
324             /// <summary>The action block being viewed.</summary>
325             private readonly ActionBlock<TInput> _actionBlock;
326             /// <summary>The action block's default target being viewed.</summary>
327             private readonly TargetCore<TInput>.DebuggingInformation _defaultDebugInfo;
328             /// <summary>The action block's SPSC target being viewed.</summary>
329             private readonly SpscTargetCore<TInput>.DebuggingInformation _spscDebugInfo;
330
331             /// <summary>Initializes the debug view.</summary>
332             /// <param name="actionBlock">The target being debugged.</param>
333             public DebugView(ActionBlock<TInput> actionBlock)
334             {
335                 Contract.Requires(actionBlock != null, "Need a block with which to construct the debug view.");
336                 _actionBlock = actionBlock;
337                 if (_actionBlock._defaultTarget != null)
338                 {
339                     _defaultDebugInfo = actionBlock._defaultTarget.GetDebuggingInformation();
340                 }
341                 else
342                 {
343                     _spscDebugInfo = actionBlock._spscTarget.GetDebuggingInformation();
344                 }
345             }
346
347             /// <summary>Gets the messages waiting to be processed.</summary>
348             public IEnumerable<TInput> InputQueue
349             {
350                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.InputQueue : _spscDebugInfo.InputQueue; }
351             }
352             /// <summary>Gets any postponed messages.</summary>
353             public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
354             {
355                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.PostponedMessages : null; }
356             }
357
358             /// <summary>Gets the number of outstanding input operations.</summary>
359             public Int32 CurrentDegreeOfParallelism
360             {
361                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.CurrentDegreeOfParallelism : _spscDebugInfo.CurrentDegreeOfParallelism; }
362             }
363
364             /// <summary>Gets the ExecutionDataflowBlockOptions used to configure this block.</summary>
365             public ExecutionDataflowBlockOptions DataflowBlockOptions
366             {
367                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.DataflowBlockOptions : _spscDebugInfo.DataflowBlockOptions; }
368             }
369             /// <summary>Gets whether the block is declining further messages.</summary>
370             public bool IsDecliningPermanently
371             {
372                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsDecliningPermanently : _spscDebugInfo.IsDecliningPermanently; }
373             }
374             /// <summary>Gets whether the block is completed.</summary>
375             public bool IsCompleted
376             {
377                 get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsCompleted : _spscDebugInfo.IsCompleted; }
378             }
379             /// <summary>Gets the block's Id.</summary>
380             public int Id { get { return Common.GetBlockId(_actionBlock); } }
381         }
382     }
383 }