1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // A propagator block that groups individual messages of multiple types
10 // into tuples of arrays of those messages.
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
18 using System.Threading.Tasks.Dataflow.Internal;
20 namespace System.Threading.Tasks.Dataflow
23 /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
24 /// provided to one or more of its targets.
26 /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
27 /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
28 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
29 [DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
30 public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
32 /// <summary>The size of the batches generated by this BatchedJoin.</summary>
33 private readonly int _batchSize;
34 /// <summary>State shared among the targets.</summary>
35 private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
36 /// <summary>The target providing inputs of type T1.</summary>
37 private readonly BatchedJoinBlockTarget<T1> _target1;
38 /// <summary>The target providing inputs of type T2.</summary>
39 private readonly BatchedJoinBlockTarget<T2> _target2;
40 /// <summary>The source side.</summary>
41 private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;
43 /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
44 /// <param name="batchSize">The number of items to group into a batch.</param>
45 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
46 public BatchedJoinBlock(Int32 batchSize) :
47 this(batchSize, GroupingDataflowBlockOptions.Default)
50 /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
51 /// <param name="batchSize">The number of items to group into a batch.</param>
52 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
53 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
54 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
55 public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
58 if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
59 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
60 if (!dataflowBlockOptions.Greedy) throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
61 if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded) throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, "dataflowBlockOptions");
62 Contract.EndContractBlock();
65 _batchSize = batchSize;
66 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
68 // Configure the source
69 _source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
70 this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
72 // The action to run when a batch should be created. This is typically called
73 // when we have a full batch, but it will also be called when we're done receiving
74 // messages, and thus when there may be a few stragglers we need to make a batch out of.
75 Action createBatchAction = () =>
77 if (_target1.Count > 0 || _target2.Count > 0)
79 _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
83 // Configure the targets
84 _sharedResources = new BatchedJoinBlockTargetSharedResources(
85 batchSize, dataflowBlockOptions,
94 _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
95 _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
97 // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
98 // In those cases we need to fault the target half to drop its buffered messages and to release its
99 // reservations. This should not create an infinite loop, because all our implementations are designed
100 // to handle multiple completion requests and to carry over only one.
101 _source.Completion.ContinueWith((completed, state) =>
103 var thisBlock = ((BatchedJoinBlock<T1, T2>)state) as IDataflowBlock;
104 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
105 thisBlock.Fault(completed.Exception);
106 }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
108 // Handle async cancellation requests by declining on the target
109 Common.WireCancellationToComplete(
110 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state).CompleteEachTarget(), this);
112 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
113 if (etwLog.IsEnabled())
115 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
120 /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
121 public Int32 BatchSize { get { return _batchSize; } }
123 /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
124 public ITargetBlock<T1> Target1 { get { return _target1; } }
126 /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
127 public ITargetBlock<T2> Target2 { get { return _target2; } }
129 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
130 [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
131 public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
133 return _source.LinkTo(target, linkOptions);
136 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
137 [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
138 public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>> filter, out Tuple<IList<T1>, IList<T2>> item)
140 return _source.TryReceive(filter, out item);
143 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
144 [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
145 public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>>> items) { return _source.TryReceiveAll(out items); }
147 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
148 public int OutputCount { get { return _source.OutputCount; } }
150 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
151 public Task Completion { get { return _source.Completion; } }
153 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
154 public void Complete()
156 Debug.Assert(_target1 != null, "_target1 not initialized");
157 Debug.Assert(_target2 != null, "_target2 not initialized");
163 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
164 void IDataflowBlock.Fault(Exception exception)
166 if (exception == null) throw new ArgumentNullException("exception");
167 Contract.EndContractBlock();
169 Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
170 Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
171 Debug.Assert(_source != null, "_source not initialized");
173 lock (_sharedResources._incomingLock)
175 if (!_sharedResources._decliningPermanently) _source.AddException(exception);
180 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
181 Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
182 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out Boolean messageConsumed)
184 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
187 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
188 bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
189 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
191 return _source.ReserveMessage(messageHeader, target);
194 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
195 void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
196 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
198 _source.ReleaseReservation(messageHeader, target);
202 /// Invokes Complete on each target
204 private void CompleteEachTarget()
210 /// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
211 private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
213 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
214 public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
216 /// <summary>The data to display in the debugger display attribute.</summary>
217 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
218 private object DebuggerDisplayContent
222 return string.Format("{0}, BatchSize={1}, OutputCount={2}",
223 Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
225 OutputCountForDebugger);
228 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
229 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
231 /// <summary>Provides a debugger type proxy for the Transform.</summary>
232 private sealed class DebugView
234 /// <summary>The block being viewed.</summary>
235 private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;
236 /// <summary>The source half of the block being viewed.</summary>
237 private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;
239 /// <summary>Initializes the debug view.</summary>
240 /// <param name="batchedJoinBlock">The batched join being viewed.</param>
241 public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
243 Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
244 _batchedJoinBlock = batchedJoinBlock;
245 _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
248 /// <summary>Gets the messages waiting to be received.</summary>
249 public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
250 /// <summary>Gets the number of batches created.</summary>
251 public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
252 /// <summary>Gets the number of items remaining to form a batch.</summary>
253 public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
255 /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
256 public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
257 /// <summary>Gets the first target.</summary>
258 public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
259 /// <summary>Gets the second target.</summary>
260 public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
262 /// <summary>Gets the task being used for output processing.</summary>
263 public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
265 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
266 public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
267 /// <summary>Gets whether the block is completed.</summary>
268 public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
269 /// <summary>Gets the block's Id.</summary>
270 public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
272 /// <summary>Gets the set of all targets linked from this block.</summary>
273 public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
274 /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
275 public ITargetBlock<Tuple<IList<T1>, IList<T2>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
280 /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
281 /// provided to one or more of its targets.
283 /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
284 /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
285 /// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
286 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
287 [DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
288 [SuppressMessage("Microsoft.Design", "CA1005:AvoidExcessiveParametersOnGenericTypes")]
289 public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
291 /// <summary>The size of the batches generated by this BatchedJoin.</summary>
292 private readonly int _batchSize;
293 /// <summary>State shared among the targets.</summary>
294 private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
295 /// <summary>The target providing inputs of type T1.</summary>
296 private readonly BatchedJoinBlockTarget<T1> _target1;
297 /// <summary>The target providing inputs of type T2.</summary>
298 private readonly BatchedJoinBlockTarget<T2> _target2;
299 /// <summary>The target providing inputs of type T3.</summary>
300 private readonly BatchedJoinBlockTarget<T3> _target3;
301 /// <summary>The source side.</summary>
302 private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;
304 /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
305 /// <param name="batchSize">The number of items to group into a batch.</param>
306 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
307 public BatchedJoinBlock(Int32 batchSize) :
308 this(batchSize, GroupingDataflowBlockOptions.Default)
311 /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
312 /// <param name="batchSize">The number of items to group into a batch.</param>
313 /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
314 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
315 /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
316 public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
318 // Validate arguments
319 if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
320 if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
321 if (!dataflowBlockOptions.Greedy ||
322 dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
324 throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
326 Contract.EndContractBlock();
329 _batchSize = batchSize;
330 dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
332 // Configure the source
333 _source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
334 this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
336 // The action to run when a batch should be created. This is typically called
337 // when we have a full batch, but it will also be called when we're done receiving
338 // messages, and thus when there may be a few stragglers we need to make a batch out of.
339 Action createBatchAction = () =>
341 if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
343 _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
347 // Configure the targets
348 _sharedResources = new BatchedJoinBlockTargetSharedResources(
349 batchSize, dataflowBlockOptions,
356 _source.AddException,
358 _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
359 _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
360 _target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);
362 // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
363 // In those cases we need to fault the target half to drop its buffered messages and to release its
364 // reservations. This should not create an infinite loop, because all our implementations are designed
365 // to handle multiple completion requests and to carry over only one.
366 _source.Completion.ContinueWith((completed, state) =>
368 var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state) as IDataflowBlock;
369 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
370 thisBlock.Fault(completed.Exception);
371 }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
373 // Handle async cancellation requests by declining on the target
374 Common.WireCancellationToComplete(
375 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state).CompleteEachTarget(), this);
377 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
378 if (etwLog.IsEnabled())
380 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
385 /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
386 public Int32 BatchSize { get { return _batchSize; } }
388 /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
389 public ITargetBlock<T1> Target1 { get { return _target1; } }
391 /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
392 public ITargetBlock<T2> Target2 { get { return _target2; } }
394 /// <summary>Gets a target that may be used to offer messages of the third type.</summary>
395 public ITargetBlock<T3> Target3 { get { return _target3; } }
397 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
398 public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
400 return _source.LinkTo(target, linkOptions);
403 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
404 [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
405 public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter, out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
407 return _source.TryReceive(filter, out item);
410 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
411 [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
412 public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items) { return _source.TryReceiveAll(out items); }
414 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
415 public int OutputCount { get { return _source.OutputCount; } }
417 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
418 public Task Completion { get { return _source.Completion; } }
420 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
421 public void Complete()
423 Debug.Assert(_target1 != null, "_target1 not initialized");
424 Debug.Assert(_target2 != null, "_target2 not initialized");
425 Debug.Assert(_target3 != null, "_target3 not initialized");
432 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
433 void IDataflowBlock.Fault(Exception exception)
435 if (exception == null) throw new ArgumentNullException("exception");
436 Contract.EndContractBlock();
438 Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
439 Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
440 Debug.Assert(_source != null, "_source not initialized");
442 lock (_sharedResources._incomingLock)
444 if (!_sharedResources._decliningPermanently) _source.AddException(exception);
449 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
450 Tuple<IList<T1>, IList<T2>, IList<T3>> ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
451 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out Boolean messageConsumed)
453 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
456 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
457 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
458 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
460 return _source.ReserveMessage(messageHeader, target);
463 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
464 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
465 DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
467 _source.ReleaseReservation(messageHeader, target);
471 /// Invokes Complete on each target
473 private void CompleteEachTarget()
480 /// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
481 private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
483 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
484 public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
486 /// <summary>The data to display in the debugger display attribute.</summary>
487 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
488 private object DebuggerDisplayContent
492 return string.Format("{0}, BatchSize={1}, OutputCount={2}",
493 Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
495 OutputCountForDebugger);
498 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
499 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
501 /// <summary>Provides a debugger type proxy for the Transform.</summary>
502 private sealed class DebugView
504 /// <summary>The block being viewed.</summary>
505 private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;
506 /// <summary>The source half of the block being viewed.</summary>
507 private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;
509 /// <summary>Initializes the debug view.</summary>
510 /// <param name="batchedJoinBlock">The batched join being viewed.</param>
511 public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
513 Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
514 _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
515 _batchedJoinBlock = batchedJoinBlock;
518 /// <summary>Gets the messages waiting to be received.</summary>
519 public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
520 /// <summary>Gets the number of batches created.</summary>
521 public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
522 /// <summary>Gets the number of items remaining to form a batch.</summary>
523 public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
525 /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
526 public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
527 /// <summary>Gets the first target.</summary>
528 public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
529 /// <summary>Gets the second target.</summary>
530 public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
531 /// <summary>Gets the second target.</summary>
532 public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }
534 /// <summary>Gets the task being used for output processing.</summary>
535 public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
537 /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
538 public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
539 /// <summary>Gets whether the block is completed.</summary>
540 public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
541 /// <summary>Gets the block's Id.</summary>
542 public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
544 /// <summary>Gets the set of all targets linked from this block.</summary>
545 public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
546 /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
547 public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
552 namespace System.Threading.Tasks.Dataflow.Internal
554 /// <summary>Provides the target used in a BatchedJoin.</summary>
555 /// <typeparam name="T">Specifies the type of data accepted by this target.</typeparam>
556 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
557 [DebuggerTypeProxy(typeof(BatchedJoinBlockTarget<>.DebugView))]
558 internal sealed class BatchedJoinBlockTarget<T> : ITargetBlock<T>, IDebuggerDisplay
560 /// <summary>The shared resources used by all targets associated with the same batched join instance.</summary>
561 private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
562 /// <summary>Whether this target is declining future messages.</summary>
563 private bool _decliningPermanently;
564 /// <summary>Input messages for the next batch.</summary>
565 private IList<T> _messages = new List<T>();
567 /// <summary>Initializes the target.</summary>
568 /// <param name="sharedResources">The shared resources used by all targets associated with this batched join.</param>
569 internal BatchedJoinBlockTarget(BatchedJoinBlockTargetSharedResources sharedResources)
571 Contract.Requires(sharedResources != null, "Targets require a shared resources through which to communicate.");
573 // Store the shared resources, and register with it to let it know there's
574 // another target. This is done in a non-thread-safe manner and must be done
575 // during construction of the batched join instance.
576 _sharedResources = sharedResources;
577 sharedResources._remainingAliveTargets++;
580 /// <summary>Gets the number of messages buffered in this target.</summary>
581 internal int Count { get { return _messages.Count; } }
583 /// <summary>Gets the messages buffered by this target and then empties the collection.</summary>
584 /// <returns>The messages from the target.</returns>
585 internal IList<T> GetAndEmptyMessages()
587 Common.ContractAssertMonitorStatus(_sharedResources._incomingLock, held: true);
589 IList<T> toReturn = _messages;
590 _messages = new List<T>();
594 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
595 public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
597 // Validate arguments
598 if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
599 if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
600 Contract.EndContractBlock();
602 lock (_sharedResources._incomingLock)
604 // If we've already stopped accepting messages, decline permanently
605 if (_decliningPermanently ||
606 _sharedResources._decliningPermanently)
607 return DataflowMessageStatus.DecliningPermanently;
609 // Consume the message from the source if necessary, and store the message
612 Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
615 messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
616 if (!consumed) return DataflowMessageStatus.NotAvailable;
618 _messages.Add(messageValue);
620 // If this message makes a batch, notify the shared resources that a batch has been completed
621 if (--_sharedResources._remainingItemsInBatch == 0) _sharedResources._batchSizeReachedAction();
623 return DataflowMessageStatus.Accepted;
627 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
628 public void Complete()
630 lock (_sharedResources._incomingLock)
632 // If this is the first time Complete is being called,
633 // note that there's now one fewer targets receiving messages for the batched join.
634 if (!_decliningPermanently)
636 _decliningPermanently = true;
637 if (--_sharedResources._remainingAliveTargets == 0) _sharedResources._allTargetsDecliningPermanentlyAction();
642 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
643 void IDataflowBlock.Fault(Exception exception)
645 if (exception == null) throw new ArgumentNullException("exception");
646 Contract.EndContractBlock();
648 lock (_sharedResources._incomingLock)
650 if (!_decliningPermanently && !_sharedResources._decliningPermanently) _sharedResources._exceptionAction(exception);
653 _sharedResources._completionAction();
656 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
657 Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
659 /// <summary>The data to display in the debugger display attribute.</summary>
660 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
661 private object DebuggerDisplayContent
665 return string.Format("{0} InputCount={1}",
666 Common.GetNameForDebugger(this),
670 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
671 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
673 /// <summary>Provides a debugger type proxy for the Transform.</summary>
674 private sealed class DebugView
676 /// <summary>The batched join block target being viewed.</summary>
677 private readonly BatchedJoinBlockTarget<T> _batchedJoinBlockTarget;
679 /// <summary>Initializes the debug view.</summary>
680 /// <param name="batchedJoinBlockTarget">The batched join target being viewed.</param>
681 public DebugView(BatchedJoinBlockTarget<T> batchedJoinBlockTarget)
683 Contract.Requires(batchedJoinBlockTarget != null, "Need a block with which to construct the debug view.");
684 _batchedJoinBlockTarget = batchedJoinBlockTarget;
687 /// <summary>Gets the messages waiting to be processed.</summary>
688 public IEnumerable<T> InputQueue { get { return _batchedJoinBlockTarget._messages; } }
689 /// <summary>Gets whether the block is declining further messages.</summary>
690 public bool IsDecliningPermanently
694 return _batchedJoinBlockTarget._decliningPermanently ||
695 _batchedJoinBlockTarget._sharedResources._decliningPermanently;
701 /// <summary>Provides a container for resources shared across all targets used by the same BatchedJoinBlock instance.</summary>
702 internal sealed class BatchedJoinBlockTargetSharedResources
704 /// <summary>Initializes the shared resources.</summary>
705 /// <param name="batchSize">The size of a batch to create.</param>
706 /// <param name="dataflowBlockOptions">The options used to configure the shared resources. Assumed to be immutable.</param>
707 /// <param name="batchSizeReachedAction">The action to invoke when a batch is completed.</param>
708 /// <param name="allTargetsDecliningAction">The action to invoke when no more targets are accepting input.</param>
709 /// <param name="exceptionAction">The action to invoke when an exception needs to be logged.</param>
710 /// <param name="completionAction">The action to invoke when completing, typically invoked due to a call to Fault.</param>
711 internal BatchedJoinBlockTargetSharedResources(
712 int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions,
713 Action batchSizeReachedAction, Action allTargetsDecliningAction,
714 Action<Exception> exceptionAction, Action completionAction)
716 Debug.Assert(batchSize >= 1, "A positive batch size is required.");
717 Debug.Assert(batchSizeReachedAction != null, "Need an action to invoke for each batch.");
718 Debug.Assert(allTargetsDecliningAction != null, "Need an action to invoke when all targets have declined.");
720 _incomingLock = new object();
721 _batchSize = batchSize;
723 // _remainingAliveTargets will be incremented when targets are added.
724 // They must be added during construction of the BatchedJoin<...>.
725 _remainingAliveTargets = 0;
726 _remainingItemsInBatch = batchSize;
728 // Configure what to do when batches are completed and/or all targets start declining
729 _allTargetsDecliningPermanentlyAction = () =>
731 // Invoke the caller's action
732 allTargetsDecliningAction();
734 // Don't accept any more messages. We should already
735 // be doing this anyway through each individual target's declining flag,
736 // so setting it to true is just a precaution and is also helpful
737 // when onceOnly is true.
738 _decliningPermanently = true;
740 _batchSizeReachedAction = () =>
742 // Invoke the caller's action
743 batchSizeReachedAction();
746 // If this batched join is meant to be used for only a single
747 // batch, invoke the completion logic.
748 if (_batchesCreated >= dataflowBlockOptions.ActualMaxNumberOfGroups) _allTargetsDecliningPermanentlyAction();
750 // Otherwise, get ready for the next batch.
751 else _remainingItemsInBatch = _batchSize;
753 _exceptionAction = exceptionAction;
754 _completionAction = completionAction;
758 /// A lock used to synchronize all incoming messages on all targets. It protects all of the rest
759 /// of the shared Resources's state and will be held while invoking the delegates.
761 internal readonly object _incomingLock;
762 /// <summary>The size of the batches to generate.</summary>
763 internal readonly int _batchSize;
765 /// <summary>The action to invoke when enough elements have been accumulated to make a batch.</summary>
766 internal readonly Action _batchSizeReachedAction;
767 /// <summary>The action to invoke when all targets are declining further messages.</summary>
768 internal readonly Action _allTargetsDecliningPermanentlyAction;
769 /// <summary>The action to invoke when an exception has to be logged.</summary>
770 internal readonly Action<Exception> _exceptionAction;
771 /// <summary>The action to invoke when the owning block has to be completed.</summary>
772 internal readonly Action _completionAction;
774 /// <summary>The number of items remaining to form a batch.</summary>
775 internal int _remainingItemsInBatch;
776 /// <summary>The number of targets still alive (i.e. not declining all further messages).</summary>
777 internal int _remainingAliveTargets;
778 /// <summary>Whether all targets should decline all further messages.</summary>
779 internal bool _decliningPermanently;
780 /// <summary>The number of batches created.</summary>
781 internal long _batchesCreated;