1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 // DataflowBlockOptions.cs
9 // DataflowBlockOptions types for configuring dataflow blocks
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Diagnostics;
15 using System.Threading.Tasks;
17 namespace System.Threading.Tasks.Dataflow
20 /// Provides options used to configure the processing performed by dataflow blocks.
23 /// <see cref="DataflowBlockOptions"/> is mutable and can be configured through its properties.
24 /// When specific configuration options are not set, the following defaults are used:
25 /// <list type="table">
27 /// <term>Options</term>
28 /// <description>Default</description>
31 /// <term>TaskScheduler</term>
32 /// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
35 /// <term>MaxMessagesPerTask</term>
36 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
39 /// <term>CancellationToken</term>
40 /// <description><see cref="System.Threading.CancellationToken.None"/></description>
43 /// <term>BoundedCapacity</term>
44 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
47 /// <term>NameFormat</term>
48 /// <description>"{0} Id={1}"</description>
51 /// Dataflow blocks capture the state of the options at their construction. Subsequent changes
52 /// to the provided <see cref="DataflowBlockOptions"/> instance should not affect the behavior
53 /// of a dataflow block.
55 [DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}")]
56 public class DataflowBlockOptions
59 /// A constant used to specify an unlimited quantity for <see cref="DataflowBlockOptions"/> members
60 /// that provide an upper bound. This field is constant.
62 public const Int32 Unbounded = -1;
64 /// <summary>The scheduler to use for scheduling tasks to process messages.</summary>
65 private TaskScheduler _taskScheduler = TaskScheduler.Default;
66 /// <summary>The cancellation token to monitor for cancellation requests.</summary>
67 private CancellationToken _cancellationToken = CancellationToken.None;
68 /// <summary>The maximum number of messages that may be processed per task.</summary>
69 private Int32 _maxMessagesPerTask = Unbounded;
70 /// <summary>The maximum number of messages that may be buffered by the block.</summary>
71 private Int32 _boundedCapacity = Unbounded;
72 /// <summary>The name format to use for creating a name for a block.</summary>
73 private string _nameFormat = "{0} Id={1}"; // see NameFormat property for a description of format items
75 /// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
77 /// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
79 internal static readonly DataflowBlockOptions Default = new DataflowBlockOptions();
81 /// <summary>Returns this <see cref="DataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
82 /// <returns>An instance of the options that may be cached by the block.</returns>
83 internal DataflowBlockOptions DefaultOrClone()
85 return (this == Default) ?
87 new DataflowBlockOptions
89 TaskScheduler = this.TaskScheduler,
90 CancellationToken = this.CancellationToken,
91 MaxMessagesPerTask = this.MaxMessagesPerTask,
92 BoundedCapacity = this.BoundedCapacity,
93 NameFormat = this.NameFormat
97 /// <summary>Initializes the <see cref="DataflowBlockOptions"/>.</summary>
98 public DataflowBlockOptions() { }
100 /// <summary>Gets or sets the <see cref="System.Threading.Tasks.TaskScheduler"/> to use for scheduling tasks.</summary>
101 public TaskScheduler TaskScheduler
103 get { return _taskScheduler; }
106 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
107 if (value == null) throw new ArgumentNullException("value");
108 _taskScheduler = value;
112 /// <summary>Gets or sets the <see cref="System.Threading.CancellationToken"/> to monitor for cancellation requests.</summary>
113 public CancellationToken CancellationToken
115 get { return _cancellationToken; }
118 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
119 _cancellationToken = value;
123 /// <summary>Gets or sets the maximum number of messages that may be processed per task.</summary>
124 public Int32 MaxMessagesPerTask
126 get { return _maxMessagesPerTask; }
129 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
130 if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
131 _maxMessagesPerTask = value;
135 /// <summary>Gets a MaxMessagesPerTask value that may be used for comparison purposes.</summary>
136 /// <returns>The maximum value, usable for comparison purposes.</returns>
137 /// <remarks>Unlike MaxMessagesPerTask, this property will always return a positive value.</remarks>
138 internal Int32 ActualMaxMessagesPerTask
140 get { return (_maxMessagesPerTask == Unbounded) ? Int32.MaxValue : _maxMessagesPerTask; }
143 /// <summary>Gets or sets the maximum number of messages that may be buffered by the block.</summary>
144 public Int32 BoundedCapacity
146 get { return _boundedCapacity; }
149 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
150 if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
151 _boundedCapacity = value;
156 /// Gets or sets the format string to use when a block is queried for its name.
159 /// The name format may contain up to two format items. {0} will be substituted
160 /// with the block's name. {1} will be substituted with the block's Id, as is
161 /// returned from the block's Completion.Id property.
163 public string NameFormat
165 get { return _nameFormat; }
168 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
169 if (value == null) throw new ArgumentNullException("value");
176 /// Provides options used to configure the processing performed by dataflow blocks that
177 /// process each message through the invocation of a user-provided delegate, blocks such
178 /// as <see cref="ActionBlock{T}"/> and <see cref="TransformBlock{TInput,TOutput}"/>.
181 /// <see cref="ExecutionDataflowBlockOptions"/> is mutable and can be configured through its properties.
182 /// When specific configuration options are not set, the following defaults are used:
183 /// <list type="table">
185 /// <term>Options</term>
186 /// <description>Default</description>
189 /// <term>TaskScheduler</term>
190 /// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
193 /// <term>CancellationToken</term>
194 /// <description><see cref="System.Threading.CancellationToken.None"/></description>
197 /// <term>MaxMessagesPerTask</term>
198 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
201 /// <term>BoundedCapacity</term>
202 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
205 /// <term>NameFormat</term>
206 /// <description>"{0} Id={1}"</description>
209 /// <term>MaxDegreeOfParallelism</term>
210 /// <description>1</description>
213 /// <term>SingleProducerConstrained</term>
214 /// <description>false</description>
217 /// Dataflow block captures the state of the options at their construction. Subsequent changes
218 /// to the provided <see cref="ExecutionDataflowBlockOptions"/> instance should not affect the behavior
219 /// of a dataflow block.
221 [DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, MaxDegreeOfParallelism = {MaxDegreeOfParallelism}")]
222 public class ExecutionDataflowBlockOptions : DataflowBlockOptions
224 /// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
226 /// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
228 internal new static readonly ExecutionDataflowBlockOptions Default = new ExecutionDataflowBlockOptions();
230 /// <summary>Returns this <see cref="ExecutionDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
231 /// <returns>An instance of the options that may be cached by the block.</returns>
232 internal new ExecutionDataflowBlockOptions DefaultOrClone()
234 return (this == Default) ?
236 new ExecutionDataflowBlockOptions
238 TaskScheduler = this.TaskScheduler,
239 CancellationToken = this.CancellationToken,
240 MaxMessagesPerTask = this.MaxMessagesPerTask,
241 BoundedCapacity = this.BoundedCapacity,
242 NameFormat = this.NameFormat,
243 MaxDegreeOfParallelism = this.MaxDegreeOfParallelism,
244 SingleProducerConstrained = this.SingleProducerConstrained
248 /// <summary>The maximum number of tasks that may be used concurrently to process messages.</summary>
249 private Int32 _maxDegreeOfParallelism = 1;
250 /// <summary>Whether the code using this block will only ever have a single producer accessing the block at any given time.</summary>
251 private Boolean _singleProducerConstrained = false;
253 /// <summary>Initializes the <see cref="ExecutionDataflowBlockOptions"/>.</summary>
254 public ExecutionDataflowBlockOptions() { }
256 /// <summary>Gets the maximum number of messages that may be processed by the block concurrently.</summary>
257 public Int32 MaxDegreeOfParallelism
259 get { return _maxDegreeOfParallelism; }
262 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
263 if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
264 _maxDegreeOfParallelism = value;
269 /// Gets whether code using the dataflow block is constrained to one producer at a time.
272 /// This property defaults to false, such that the block may be used by multiple
273 /// producers concurrently. This property should only be set to true if the code
274 /// using the block can guarantee that it will only ever be used by one producer
275 /// (e.g. a source linked to the block) at a time, meaning that methods like Post,
276 /// Complete, Fault, and OfferMessage will never be called concurrently. Some blocks
277 /// may choose to capitalize on the knowledge that there will only be one producer at a time
278 /// in order to provide better performance.
280 public Boolean SingleProducerConstrained
282 get { return _singleProducerConstrained; }
285 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
286 _singleProducerConstrained = value;
290 /// <summary>Gets a MaxDegreeOfParallelism value that may be used for comparison purposes.</summary>
291 /// <returns>The maximum value, usable for comparison purposes.</returns>
292 /// <remarks>Unlike MaxDegreeOfParallelism, this property will always return a positive value.</remarks>
293 internal Int32 ActualMaxDegreeOfParallelism
295 get { return (_maxDegreeOfParallelism == Unbounded) ? Int32.MaxValue : _maxDegreeOfParallelism; }
298 /// <summary>Gets whether these dataflow block options allow for parallel execution.</summary>
299 internal Boolean SupportsParallelExecution { get { return _maxDegreeOfParallelism == Unbounded || _maxDegreeOfParallelism > 1; } }
303 /// Provides options used to configure the processing performed by dataflow blocks that
304 /// group together multiple messages, blocks such as <see cref="JoinBlock{T1,T2}"/> and
305 /// <see cref="BatchBlock{T}"/>.
308 /// <see cref="GroupingDataflowBlockOptions"/> is mutable and can be configured through its properties.
309 /// When specific configuration options are not set, the following defaults are used:
310 /// <list type="table">
312 /// <term>Options</term>
313 /// <description>Default</description>
316 /// <term>TaskScheduler</term>
317 /// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
320 /// <term>CancellationToken</term>
321 /// <description><see cref="System.Threading.CancellationToken.None"/></description>
324 /// <term>MaxMessagesPerTask</term>
325 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
328 /// <term>BoundedCapacity</term>
329 /// <description>DataflowBlockOptions.Unbounded (-1)</description>
332 /// <term>NameFormat</term>
333 /// <description>"{0} Id={1}"</description>
336 /// <term>MaxNumberOfGroups</term>
337 /// <description>GroupingDataflowBlockOptions.Unbounded (-1)</description>
340 /// <term>Greedy</term>
341 /// <description>true</description>
344 /// Dataflow block capture the state of the options at their construction. Subsequent changes
345 /// to the provided <see cref="GroupingDataflowBlockOptions"/> instance should not affect the behavior
346 /// of a dataflow block.
348 [DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, Greedy = {Greedy}, MaxNumberOfGroups = {MaxNumberOfGroups}")]
349 public class GroupingDataflowBlockOptions : DataflowBlockOptions
351 /// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
353 /// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
355 internal new static readonly GroupingDataflowBlockOptions Default = new GroupingDataflowBlockOptions();
357 /// <summary>Returns this <see cref="GroupingDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
358 /// <returns>An instance of the options that may be cached by the block.</returns>
359 internal new GroupingDataflowBlockOptions DefaultOrClone()
361 return (this == Default) ?
363 new GroupingDataflowBlockOptions
365 TaskScheduler = this.TaskScheduler,
366 CancellationToken = this.CancellationToken,
367 MaxMessagesPerTask = this.MaxMessagesPerTask,
368 BoundedCapacity = this.BoundedCapacity,
369 NameFormat = this.NameFormat,
370 Greedy = this.Greedy,
371 MaxNumberOfGroups = this.MaxNumberOfGroups
375 /// <summary>Whether the block should greedily consume offered messages.</summary>
376 private Boolean _greedy = true;
377 /// <summary>The maximum number of groups that should be generated by the block.</summary>
378 private Int64 _maxNumberOfGroups = Unbounded;
380 /// <summary>Initializes the <see cref="GroupingDataflowBlockOptions"/>.</summary>
381 public GroupingDataflowBlockOptions() { }
383 /// <summary>Gets or sets the Boolean value to use to determine whether to greedily consume offered messages.</summary>
384 public Boolean Greedy
386 get { return _greedy; }
389 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
394 /// <summary>Gets or sets the maximum number of groups that should be generated by the block.</summary>
395 public Int64 MaxNumberOfGroups
397 get { return _maxNumberOfGroups; }
400 Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
401 if (value <= 0 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
402 _maxNumberOfGroups = value;
406 /// <summary>Gets a MaxNumberOfGroups value that may be used for comparison purposes.</summary>
407 /// <returns>The maximum value, usable for comparison purposes.</returns>
408 /// <remarks>Unlike MaxNumberOfGroups, this property will always return a positive value.</remarks>
409 internal Int64 ActualMaxNumberOfGroups
411 get { return (_maxNumberOfGroups == Unbounded) ? Int64.MaxValue : _maxNumberOfGroups; }