Merge pull request #2274 from esdrubal/udpclientreceive
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Blocks / BatchedJoinBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // BatchedJoinBlock.cs
7 //
8 //
9 // A propagator block that groups individual messages of multiple types
10 // into tuples of arrays of those messages.
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics;
16 using System.Diagnostics.CodeAnalysis;
17 using System.Diagnostics.Contracts;
18 using System.Threading.Tasks.Dataflow.Internal;
19
20 namespace System.Threading.Tasks.Dataflow
21 {
22     /// <summary>
23     /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
24     /// provided to one or more of its targets.
25     /// </summary>
26     /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
27     /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
28     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
29     [DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
30     public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
31     {
32         /// <summary>The size of the batches generated by this BatchedJoin.</summary>
33         private readonly int _batchSize;
34         /// <summary>State shared among the targets.</summary>
35         private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
36         /// <summary>The target providing inputs of type T1.</summary>
37         private readonly BatchedJoinBlockTarget<T1> _target1;
38         /// <summary>The target providing inputs of type T2.</summary>
39         private readonly BatchedJoinBlockTarget<T2> _target2;
40         /// <summary>The source side.</summary>
41         private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;
42
43         /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
44         /// <param name="batchSize">The number of items to group into a batch.</param>
45         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
46         public BatchedJoinBlock(Int32 batchSize) :
47             this(batchSize, GroupingDataflowBlockOptions.Default)
48         { }
49
50         /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
51         /// <param name="batchSize">The number of items to group into a batch.</param>
52         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
53         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
54         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
55         public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
56         {
57             // Validate arguments
58             if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
59             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
60             if (!dataflowBlockOptions.Greedy) throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
61             if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded) throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, "dataflowBlockOptions");
62             Contract.EndContractBlock();
63
64             // Store arguments
65             _batchSize = batchSize;
66             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
67
68             // Configure the source
69             _source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
70                 this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
71
72             // The action to run when a batch should be created.  This is typically called
73             // when we have a full batch, but it will also be called when we're done receiving
74             // messages, and thus when there may be a few stragglers we need to make a batch out of.
75             Action createBatchAction = () =>
76             {
77                 if (_target1.Count > 0 || _target2.Count > 0)
78                 {
79                     _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
80                 }
81             };
82
83             // Configure the targets
84             _sharedResources = new BatchedJoinBlockTargetSharedResources(
85                 batchSize, dataflowBlockOptions,
86                 createBatchAction,
87                 () =>
88                 {
89                     createBatchAction();
90                     _source.Complete();
91                 },
92                 _source.AddException,
93                 Complete);
94             _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
95             _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
96
97             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
98             // In those cases we need to fault the target half to drop its buffered messages and to release its 
99             // reservations. This should not create an infinite loop, because all our implementations are designed
100             // to handle multiple completion requests and to carry over only one.
101             _source.Completion.ContinueWith((completed, state) =>
102             {
103                 var thisBlock = ((BatchedJoinBlock<T1, T2>)state) as IDataflowBlock;
104                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
105                 thisBlock.Fault(completed.Exception);
106             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
107
108             // Handle async cancellation requests by declining on the target
109             Common.WireCancellationToComplete(
110                 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state).CompleteEachTarget(), this);
111 #if FEATURE_TRACING
112             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
113             if (etwLog.IsEnabled())
114             {
115                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
116             }
117 #endif
118         }
119
120         /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
121         public Int32 BatchSize { get { return _batchSize; } }
122
123         /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
124         public ITargetBlock<T1> Target1 { get { return _target1; } }
125
126         /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
127         public ITargetBlock<T2> Target2 { get { return _target2; } }
128
129         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
130         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
131         public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
132         {
133             return _source.LinkTo(target, linkOptions);
134         }
135
136         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
137         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
138         public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>> filter, out Tuple<IList<T1>, IList<T2>> item)
139         {
140             return _source.TryReceive(filter, out item);
141         }
142
143         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
144         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
145         public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>>> items) { return _source.TryReceiveAll(out items); }
146
147         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
148         public int OutputCount { get { return _source.OutputCount; } }
149
150         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
151         public Task Completion { get { return _source.Completion; } }
152
153         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
154         public void Complete()
155         {
156             Debug.Assert(_target1 != null, "_target1 not initialized");
157             Debug.Assert(_target2 != null, "_target2 not initialized");
158
159             _target1.Complete();
160             _target2.Complete();
161         }
162
163         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
164         void IDataflowBlock.Fault(Exception exception)
165         {
166             if (exception == null) throw new ArgumentNullException("exception");
167             Contract.EndContractBlock();
168
169             Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
170             Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
171             Debug.Assert(_source != null, "_source not initialized");
172
173             lock (_sharedResources._incomingLock)
174             {
175                 if (!_sharedResources._decliningPermanently) _source.AddException(exception);
176             }
177             Complete();
178         }
179
180         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
181         Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
182             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out Boolean messageConsumed)
183         {
184             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
185         }
186
187         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
188         bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
189             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
190         {
191             return _source.ReserveMessage(messageHeader, target);
192         }
193
194         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
195         void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
196             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
197         {
198             _source.ReleaseReservation(messageHeader, target);
199         }
200
201         /// <summary>
202         /// Invokes Complete on each target
203         /// </summary>
204         private void CompleteEachTarget()
205         {
206             _target1.Complete();
207             _target2.Complete();
208         }
209
210         /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
211         private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
212
213         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
214         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
215
216         /// <summary>The data to display in the debugger display attribute.</summary>
217         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
218         private object DebuggerDisplayContent
219         {
220             get
221             {
222                 return string.Format("{0}, BatchSize={1}, OutputCount={2}",
223                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
224                     BatchSize,
225                     OutputCountForDebugger);
226             }
227         }
228         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
229         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
230
231         /// <summary>Provides a debugger type proxy for the Transform.</summary>
232         private sealed class DebugView
233         {
234             /// <summary>The block being viewed.</summary>
235             private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;
236             /// <summary>The source half of the block being viewed.</summary>
237             private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;
238
239             /// <summary>Initializes the debug view.</summary>
240             /// <param name="batchedJoinBlock">The batched join being viewed.</param>
241             public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
242             {
243                 Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
244                 _batchedJoinBlock = batchedJoinBlock;
245                 _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
246             }
247
248             /// <summary>Gets the messages waiting to be received.</summary>
249             public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
250             /// <summary>Gets the number of batches created.</summary>
251             public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
252             /// <summary>Gets the number of items remaining to form a batch.</summary>
253             public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
254
255             /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
256             public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
257             /// <summary>Gets the first target.</summary>
258             public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
259             /// <summary>Gets the second target.</summary>
260             public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
261
262             /// <summary>Gets the task being used for output processing.</summary>
263             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
264
265             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
266             public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
267             /// <summary>Gets whether the block is completed.</summary>
268             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
269             /// <summary>Gets the block's Id.</summary>
270             public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
271
272             /// <summary>Gets the set of all targets linked from this block.</summary>
273             public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
274             /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
275             public ITargetBlock<Tuple<IList<T1>, IList<T2>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
276         }
277     }
278
279     /// <summary>
280     /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
281     /// provided to one or more of its targets.
282     /// </summary>
283     /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
284     /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
285     /// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
286     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
287     [DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
288     [SuppressMessage("Microsoft.Design", "CA1005:AvoidExcessiveParametersOnGenericTypes")]
289     public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
290     {
291         /// <summary>The size of the batches generated by this BatchedJoin.</summary>
292         private readonly int _batchSize;
293         /// <summary>State shared among the targets.</summary>
294         private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
295         /// <summary>The target providing inputs of type T1.</summary>
296         private readonly BatchedJoinBlockTarget<T1> _target1;
297         /// <summary>The target providing inputs of type T2.</summary>
298         private readonly BatchedJoinBlockTarget<T2> _target2;
299         /// <summary>The target providing inputs of type T3.</summary>
300         private readonly BatchedJoinBlockTarget<T3> _target3;
301         /// <summary>The source side.</summary>
302         private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;
303
304         /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
305         /// <param name="batchSize">The number of items to group into a batch.</param>
306         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
307         public BatchedJoinBlock(Int32 batchSize) :
308             this(batchSize, GroupingDataflowBlockOptions.Default)
309         { }
310
311         /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
312         /// <param name="batchSize">The number of items to group into a batch.</param>
313         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
314         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
315         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
316         public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
317         {
318             // Validate arguments
319             if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
320             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
321             if (!dataflowBlockOptions.Greedy ||
322                 dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
323             {
324                 throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
325             }
326             Contract.EndContractBlock();
327
328             // Store arguments
329             _batchSize = batchSize;
330             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
331
332             // Configure the source
333             _source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
334                 this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
335
336             // The action to run when a batch should be created.  This is typically called
337             // when we have a full batch, but it will also be called when we're done receiving
338             // messages, and thus when there may be a few stragglers we need to make a batch out of.
339             Action createBatchAction = () =>
340             {
341                 if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
342                 {
343                     _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
344                 }
345             };
346
347             // Configure the targets
348             _sharedResources = new BatchedJoinBlockTargetSharedResources(
349                 batchSize, dataflowBlockOptions,
350                 createBatchAction,
351                 () =>
352                 {
353                     createBatchAction();
354                     _source.Complete();
355                 },
356                 _source.AddException,
357                 Complete);
358             _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
359             _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
360             _target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);
361
362             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
363             // In those cases we need to fault the target half to drop its buffered messages and to release its 
364             // reservations. This should not create an infinite loop, because all our implementations are designed
365             // to handle multiple completion requests and to carry over only one.
366             _source.Completion.ContinueWith((completed, state) =>
367             {
368                 var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state) as IDataflowBlock;
369                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
370                 thisBlock.Fault(completed.Exception);
371             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
372
373             // Handle async cancellation requests by declining on the target
374             Common.WireCancellationToComplete(
375                 dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state).CompleteEachTarget(), this);
376 #if FEATURE_TRACING
377             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
378             if (etwLog.IsEnabled())
379             {
380                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
381             }
382 #endif
383         }
384
385         /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
386         public Int32 BatchSize { get { return _batchSize; } }
387
388         /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
389         public ITargetBlock<T1> Target1 { get { return _target1; } }
390
391         /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
392         public ITargetBlock<T2> Target2 { get { return _target2; } }
393
394         /// <summary>Gets a target that may be used to offer messages of the third type.</summary>
395         public ITargetBlock<T3> Target3 { get { return _target3; } }
396
397         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
398         public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
399         {
400             return _source.LinkTo(target, linkOptions);
401         }
402
403         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
404         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
405         public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter, out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
406         {
407             return _source.TryReceive(filter, out item);
408         }
409
410         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
411         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
412         public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items) { return _source.TryReceiveAll(out items); }
413
414         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
415         public int OutputCount { get { return _source.OutputCount; } }
416
417         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
418         public Task Completion { get { return _source.Completion; } }
419
420         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
421         public void Complete()
422         {
423             Debug.Assert(_target1 != null, "_target1 not initialized");
424             Debug.Assert(_target2 != null, "_target2 not initialized");
425             Debug.Assert(_target3 != null, "_target3 not initialized");
426
427             _target1.Complete();
428             _target2.Complete();
429             _target3.Complete();
430         }
431
432         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
433         void IDataflowBlock.Fault(Exception exception)
434         {
435             if (exception == null) throw new ArgumentNullException("exception");
436             Contract.EndContractBlock();
437
438             Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
439             Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
440             Debug.Assert(_source != null, "_source not initialized");
441
442             lock (_sharedResources._incomingLock)
443             {
444                 if (!_sharedResources._decliningPermanently) _source.AddException(exception);
445             }
446             Complete();
447         }
448
449         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
450         Tuple<IList<T1>, IList<T2>, IList<T3>> ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
451             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out Boolean messageConsumed)
452         {
453             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
454         }
455
456         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
457         bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
458             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
459         {
460             return _source.ReserveMessage(messageHeader, target);
461         }
462
463         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
464         void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
465             DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
466         {
467             _source.ReleaseReservation(messageHeader, target);
468         }
469
470         /// <summary>
471         /// Invokes Complete on each target
472         /// </summary>
473         private void CompleteEachTarget()
474         {
475             _target1.Complete();
476             _target2.Complete();
477             _target3.Complete();
478         }
479
480         /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
481         private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
482
483         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
484         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
485
486         /// <summary>The data to display in the debugger display attribute.</summary>
487         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
488         private object DebuggerDisplayContent
489         {
490             get
491             {
492                 return string.Format("{0}, BatchSize={1}, OutputCount={2}",
493                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
494                     BatchSize,
495                     OutputCountForDebugger);
496             }
497         }
498         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
499         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
500
501         /// <summary>Provides a debugger type proxy for the Transform.</summary>
502         private sealed class DebugView
503         {
504             /// <summary>The block being viewed.</summary>
505             private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;
506             /// <summary>The source half of the block being viewed.</summary>
507             private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;
508
509             /// <summary>Initializes the debug view.</summary>
510             /// <param name="batchedJoinBlock">The batched join being viewed.</param>
511             public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
512             {
513                 Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
514                 _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
515                 _batchedJoinBlock = batchedJoinBlock;
516             }
517
518             /// <summary>Gets the messages waiting to be received.</summary>
519             public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
520             /// <summary>Gets the number of batches created.</summary>
521             public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
522             /// <summary>Gets the number of items remaining to form a batch.</summary>
523             public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
524
525             /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
526             public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
527             /// <summary>Gets the first target.</summary>
528             public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
529             /// <summary>Gets the second target.</summary>
530             public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
531             /// <summary>Gets the second target.</summary>
532             public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }
533
534             /// <summary>Gets the task being used for output processing.</summary>
535             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
536
537             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
538             public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
539             /// <summary>Gets whether the block is completed.</summary>
540             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
541             /// <summary>Gets the block's Id.</summary>
542             public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
543
544             /// <summary>Gets the set of all targets linked from this block.</summary>
545             public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
546             /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
547             public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
548         }
549     }
550 }
551
552 namespace System.Threading.Tasks.Dataflow.Internal
553 {
554     /// <summary>Provides the target used in a BatchedJoin.</summary>
555     /// <typeparam name="T">Specifies the type of data accepted by this target.</typeparam>
556     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
557     [DebuggerTypeProxy(typeof(BatchedJoinBlockTarget<>.DebugView))]
558     internal sealed class BatchedJoinBlockTarget<T> : ITargetBlock<T>, IDebuggerDisplay
559     {
560         /// <summary>The shared resources used by all targets associated with the same batched join instance.</summary>
561         private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
562         /// <summary>Whether this target is declining future messages.</summary>
563         private bool _decliningPermanently;
564         /// <summary>Input messages for the next batch.</summary>
565         private IList<T> _messages = new List<T>();
566
567         /// <summary>Initializes the target.</summary>
568         /// <param name="sharedResources">The shared resources used by all targets associated with this batched join.</param>
569         internal BatchedJoinBlockTarget(BatchedJoinBlockTargetSharedResources sharedResources)
570         {
571             Contract.Requires(sharedResources != null, "Targets require a shared resources through which to communicate.");
572
573             // Store the shared resources, and register with it to let it know there's 
574             // another target. This is done in a non-thread-safe manner and must be done 
575             // during construction of the batched join instance.
576             _sharedResources = sharedResources;
577             sharedResources._remainingAliveTargets++;
578         }
579
580         /// <summary>Gets the number of messages buffered in this target.</summary>
581         internal int Count { get { return _messages.Count; } }
582
583         /// <summary>Gets the messages buffered by this target and then empties the collection.</summary>
584         /// <returns>The messages from the target.</returns>
585         internal IList<T> GetAndEmptyMessages()
586         {
587             Common.ContractAssertMonitorStatus(_sharedResources._incomingLock, held: true);
588
589             IList<T> toReturn = _messages;
590             _messages = new List<T>();
591             return toReturn;
592         }
593
594         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
595         public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
596         {
597             // Validate arguments
598             if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
599             if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
600             Contract.EndContractBlock();
601
602             lock (_sharedResources._incomingLock)
603             {
604                 // If we've already stopped accepting messages, decline permanently
605                 if (_decliningPermanently ||
606                     _sharedResources._decliningPermanently)
607                     return DataflowMessageStatus.DecliningPermanently;
608
609                 // Consume the message from the source if necessary, and store the message
610                 if (consumeToAccept)
611                 {
612                     Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
613
614                     bool consumed;
615                     messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
616                     if (!consumed) return DataflowMessageStatus.NotAvailable;
617                 }
618                 _messages.Add(messageValue);
619
620                 // If this message makes a batch, notify the shared resources that a batch has been completed
621                 if (--_sharedResources._remainingItemsInBatch == 0) _sharedResources._batchSizeReachedAction();
622
623                 return DataflowMessageStatus.Accepted;
624             }
625         }
626
627         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
628         public void Complete()
629         {
630             lock (_sharedResources._incomingLock)
631             {
632                 // If this is the first time Complete is being called,
633                 // note that there's now one fewer targets receiving messages for the batched join.
634                 if (!_decliningPermanently)
635                 {
636                     _decliningPermanently = true;
637                     if (--_sharedResources._remainingAliveTargets == 0) _sharedResources._allTargetsDecliningPermanentlyAction();
638                 }
639             }
640         }
641
642         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
643         void IDataflowBlock.Fault(Exception exception)
644         {
645             if (exception == null) throw new ArgumentNullException("exception");
646             Contract.EndContractBlock();
647
648             lock (_sharedResources._incomingLock)
649             {
650                 if (!_decliningPermanently && !_sharedResources._decliningPermanently) _sharedResources._exceptionAction(exception);
651             }
652
653             _sharedResources._completionAction();
654         }
655
656         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
657         Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
658
659         /// <summary>The data to display in the debugger display attribute.</summary>
660         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
661         private object DebuggerDisplayContent
662         {
663             get
664             {
665                 return string.Format("{0} InputCount={1}",
666                     Common.GetNameForDebugger(this),
667                     _messages.Count);
668             }
669         }
670         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
671         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
672
673         /// <summary>Provides a debugger type proxy for the Transform.</summary>
674         private sealed class DebugView
675         {
676             /// <summary>The batched join block target being viewed.</summary>
677             private readonly BatchedJoinBlockTarget<T> _batchedJoinBlockTarget;
678
679             /// <summary>Initializes the debug view.</summary>
680             /// <param name="batchedJoinBlockTarget">The batched join target being viewed.</param>
681             public DebugView(BatchedJoinBlockTarget<T> batchedJoinBlockTarget)
682             {
683                 Contract.Requires(batchedJoinBlockTarget != null, "Need a block with which to construct the debug view.");
684                 _batchedJoinBlockTarget = batchedJoinBlockTarget;
685             }
686
687             /// <summary>Gets the messages waiting to be processed.</summary>
688             public IEnumerable<T> InputQueue { get { return _batchedJoinBlockTarget._messages; } }
689             /// <summary>Gets whether the block is declining further messages.</summary>
690             public bool IsDecliningPermanently
691             {
692                 get
693                 {
694                     return _batchedJoinBlockTarget._decliningPermanently ||
695                         _batchedJoinBlockTarget._sharedResources._decliningPermanently;
696                 }
697             }
698         }
699     }
700
701     /// <summary>Provides a container for resources shared across all targets used by the same BatchedJoinBlock instance.</summary>
702     internal sealed class BatchedJoinBlockTargetSharedResources
703     {
704         /// <summary>Initializes the shared resources.</summary>
705         /// <param name="batchSize">The size of a batch to create.</param>
706         /// <param name="dataflowBlockOptions">The options used to configure the shared resources.  Assumed to be immutable.</param>
707         /// <param name="batchSizeReachedAction">The action to invoke when a batch is completed.</param>
708         /// <param name="allTargetsDecliningAction">The action to invoke when no more targets are accepting input.</param>
709         /// <param name="exceptionAction">The action to invoke when an exception needs to be logged.</param>
710         /// <param name="completionAction">The action to invoke when completing, typically invoked due to a call to Fault.</param>
711         internal BatchedJoinBlockTargetSharedResources(
712             int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions,
713             Action batchSizeReachedAction, Action allTargetsDecliningAction,
714             Action<Exception> exceptionAction, Action completionAction)
715         {
716             Debug.Assert(batchSize >= 1, "A positive batch size is required.");
717             Debug.Assert(batchSizeReachedAction != null, "Need an action to invoke for each batch.");
718             Debug.Assert(allTargetsDecliningAction != null, "Need an action to invoke when all targets have declined.");
719
720             _incomingLock = new object();
721             _batchSize = batchSize;
722
723             // _remainingAliveTargets will be incremented when targets are added.
724             // They must be added during construction of the BatchedJoin<...>.
725             _remainingAliveTargets = 0;
726             _remainingItemsInBatch = batchSize;
727
728             // Configure what to do when batches are completed and/or all targets start declining
729             _allTargetsDecliningPermanentlyAction = () =>
730             {
731                 // Invoke the caller's action
732                 allTargetsDecliningAction();
733
734                 // Don't accept any more messages.  We should already
735                 // be doing this anyway through each individual target's declining flag, 
736                 // so setting it to true is just a precaution and is also helpful
737                 // when onceOnly is true.
738                 _decliningPermanently = true;
739             };
740             _batchSizeReachedAction = () =>
741             {
742                 // Invoke the caller's action
743                 batchSizeReachedAction();
744                 _batchesCreated++;
745
746                 // If this batched join is meant to be used for only a single
747                 // batch, invoke the completion logic.
748                 if (_batchesCreated >= dataflowBlockOptions.ActualMaxNumberOfGroups) _allTargetsDecliningPermanentlyAction();
749
750                 // Otherwise, get ready for the next batch.
751                 else _remainingItemsInBatch = _batchSize;
752             };
753             _exceptionAction = exceptionAction;
754             _completionAction = completionAction;
755         }
756
757         /// <summary>
758         /// A lock used to synchronize all incoming messages on all targets. It protects all of the rest 
759         /// of the shared Resources's state and will be held while invoking the delegates.
760         /// </summary>
761         internal readonly object _incomingLock;
762         /// <summary>The size of the batches to generate.</summary>
763         internal readonly int _batchSize;
764
765         /// <summary>The action to invoke when enough elements have been accumulated to make a batch.</summary>
766         internal readonly Action _batchSizeReachedAction;
767         /// <summary>The action to invoke when all targets are declining further messages.</summary>
768         internal readonly Action _allTargetsDecliningPermanentlyAction;
769         /// <summary>The action to invoke when an exception has to be logged.</summary>
770         internal readonly Action<Exception> _exceptionAction;
771         /// <summary>The action to invoke when the owning block has to be completed.</summary>
772         internal readonly Action _completionAction;
773
774         /// <summary>The number of items remaining to form a batch.</summary>
775         internal int _remainingItemsInBatch;
776         /// <summary>The number of targets still alive (i.e. not declining all further messages).</summary>
777         internal int _remainingAliveTargets;
778         /// <summary>Whether all targets should decline all further messages.</summary>
779         internal bool _decliningPermanently;
780         /// <summary>The number of batches created.</summary>
781         internal long _batchesCreated;
782     }
783 }