Merge pull request #2223 from lobrien/master
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Blocks / TransformManyBlock.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // TransformManyBlock.cs
7 //
8 //
9 // A propagator block that runs a function on each input to produce zero or more outputs.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Linq;
18 using System.Threading.Tasks.Dataflow.Internal;
19 using System.Collections.ObjectModel;
20
21 namespace System.Threading.Tasks.Dataflow
22 {
23     /// <summary>Provides a dataflow block that invokes a provided <see cref="System.Func{T,TResult}"/> delegate for every data element received.</summary>
24     /// <typeparam name="TInput">Specifies the type of data received and operated on by this <see cref="TransformManyBlock{TInput,TOutput}"/>.</typeparam>
25     /// <typeparam name="TOutput">Specifies the type of data output by this <see cref="TransformManyBlock{TInput,TOutput}"/>.</typeparam>
26     [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
27     [DebuggerTypeProxy(typeof(TransformManyBlock<,>.DebugView))]
28     public sealed class TransformManyBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
29     {
30         /// <summary>The target side.</summary>
31         private readonly TargetCore<TInput> _target;
32         /// <summary>
33         /// Buffer used to reorder output sets that may have completed out-of-order between the target half and the source half.
34         /// This specialized reordering buffer supports streaming out enumerables if the message is the next in line.
35         /// </summary>
36         private readonly ReorderingBuffer<IEnumerable<TOutput>> _reorderingBuffer;
37         /// <summary>The source side.</summary>
38         private readonly SourceCore<TOutput> _source;
39
40         /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function.</summary>
41         /// <param name="transform">
42         /// The function to invoke with each data element received.  All of the data from the returned <see cref="System.Collections.Generic.IEnumerable{TOutput}"/>
43         /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
44         /// </param>
45         /// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
46         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
47         public TransformManyBlock(Func<TInput, IEnumerable<TOutput>> transform) :
48             this(transform, null, ExecutionDataflowBlockOptions.Default)
49         { }
50
51         /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
52         /// <param name="transform">
53         /// The function to invoke with each data element received.  All of the data from the returned in the <see cref="System.Collections.Generic.IEnumerable{TOutput}"/>
54         /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
55         /// </param>
56         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformManyBlock{TInput,TOutput}"/>.</param>
57         /// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
58         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
59         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
60         public TransformManyBlock(Func<TInput, IEnumerable<TOutput>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) :
61             this(transform, null, dataflowBlockOptions)
62         { }
63
64         /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function.</summary>
65         /// <param name="transform">
66         /// The function to invoke with each data element received. All of the data asynchronously returned in the <see cref="System.Collections.Generic.IEnumerable{TOutput}"/>
67         /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
68         /// </param>
69         /// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
70         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
71         public TransformManyBlock(Func<TInput, Task<IEnumerable<TOutput>>> transform) :
72             this(null, transform, ExecutionDataflowBlockOptions.Default)
73         { }
74
75         /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
76         /// <param name="transform">
77         /// The function to invoke with each data element received. All of the data asynchronously returned in the <see cref="System.Collections.Generic.IEnumerable{TOutput}"/>
78         /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
79         /// </param>
80         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformManyBlock{TInput,TOutput}"/>.</param>
81         /// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
82         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
83         [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
84         public TransformManyBlock(Func<TInput, Task<IEnumerable<TOutput>>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) :
85             this(null, transform, dataflowBlockOptions)
86         { }
87
88         /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
89         /// <param name="transformSync">The synchronous function to invoke with each data element received.</param>
90         /// <param name="transformAsync">The asynchronous function to invoke with each data element received.</param>
91         /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformManyBlock{TInput,TOutput}"/>.</param>
92         /// <exception cref="System.ArgumentNullException">The <paramref name="transformSync"/> and <paramref name="transformAsync"/> are both null (Nothing in Visual Basic).</exception>
93         /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
94         private TransformManyBlock(Func<TInput, IEnumerable<TOutput>> transformSync, Func<TInput, Task<IEnumerable<TOutput>>> transformAsync, ExecutionDataflowBlockOptions dataflowBlockOptions)
95         {
96             // Validate arguments.  It's ok for the filterFunction to be null, but not the other parameters.
97             if (transformSync == null && transformAsync == null) throw new ArgumentNullException("transform");
98             if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
99
100             Contract.Requires(transformSync == null ^ transformAsync == null, "Exactly one of transformSync and transformAsync must be null.");
101             Contract.EndContractBlock();
102
103             // Ensure we have options that can't be changed by the caller
104             dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
105
106             // Initialize onItemsRemoved delegate if necessary
107             Action<ISourceBlock<TOutput>, int> onItemsRemoved = null;
108             if (dataflowBlockOptions.BoundedCapacity > 0)
109                 onItemsRemoved = (owningSource, count) => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
110
111             // Initialize source component
112             _source = new SourceCore<TOutput>(this, dataflowBlockOptions,
113                 owningSource => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
114                 onItemsRemoved);
115
116             // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
117             if (dataflowBlockOptions.SupportsParallelExecution)
118             {
119                 _reorderingBuffer = new ReorderingBuffer<IEnumerable<TOutput>>(
120                     this, (source, messages) => ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages));
121             }
122
123             // Create the underlying target and source
124             if (transformSync != null) // sync
125             {
126                 // If an enumerable function was provided, we can use synchronous completion, meaning
127                 // that the target will consider a message fully processed as soon as the
128                 // delegate returns.
129                 _target = new TargetCore<TInput>(this,
130                     messageWithId => ProcessMessage(transformSync, messageWithId),
131                     _reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.None);
132             }
133             else // async
134             {
135                 Debug.Assert(transformAsync != null, "Incorrect delegate type.");
136
137                 // If a task-based function was provided, we need to use asynchronous completion, meaning
138                 // that the target won't consider a message completed until the task
139                 // returned from that delegate has completed.
140                 _target = new TargetCore<TInput>(this,
141                     messageWithId => ProcessMessageWithTask(transformAsync, messageWithId),
142                     _reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.UsesAsyncCompletion);
143             }
144
145             // Link up the target half with the source half.  In doing so, 
146             // ensure exceptions are propagated, and let the source know no more messages will arrive.
147             // As the target has completed, and as the target synchronously pushes work
148             // through the reordering buffer when async processing completes, 
149             // we know for certain that no more messages will need to be sent to the source.
150             _target.Completion.ContinueWith((completed, state) =>
151             {
152                 var sourceCore = (SourceCore<TOutput>)state;
153                 if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception);
154                 sourceCore.Complete();
155             }, _source, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
156
157             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
158             // In those cases we need to fault the target half to drop its buffered messages and to release its 
159             // reservations. This should not create an infinite loop, because all our implementations are designed
160             // to handle multiple completion requests and to carry over only one.
161             _source.Completion.ContinueWith((completed, state) =>
162             {
163                 var thisBlock = ((TransformManyBlock<TInput, TOutput>)state) as IDataflowBlock;
164                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
165                 thisBlock.Fault(completed.Exception);
166             }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
167
168             // Handle async cancellation requests by declining on the target
169             Common.WireCancellationToComplete(
170                 dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _target);
171 #if FEATURE_TRACING
172             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
173             if (etwLog.IsEnabled())
174             {
175                 etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
176             }
177 #endif
178         }
179
180         /// <summary>Processes the message with a user-provided transform function that returns an enumerable.</summary>
181         /// <param name="transformFunction">The transform function to use to process the message.</param>
182         /// <param name="messageWithId">The message to be processed.</param>
183         private void ProcessMessage(Func<TInput, IEnumerable<TOutput>> transformFunction, KeyValuePair<TInput, long> messageWithId)
184         {
185             Contract.Requires(transformFunction != null, "Function to invoke is required.");
186
187             bool userDelegateSucceeded = false;
188             try
189             {
190                 // Run the user transform and store the results.
191                 IEnumerable<TOutput> outputItems = transformFunction(messageWithId.Key);
192                 userDelegateSucceeded = true;
193                 StoreOutputItems(messageWithId, outputItems);
194             }
195             catch (Exception exc)
196             {
197                 // If this exception represents cancellation, swallow it rather than shutting down the block.
198                 if (!Common.IsCooperativeCancellation(exc)) throw;
199             }
200             finally
201             {
202                 // If the user delegate failed, store an empty set in order 
203                 // to update the bounding count and reordering buffer.
204                 if (!userDelegateSucceeded) StoreOutputItems(messageWithId, null);
205             }
206         }
207
208         /// <summary>Processes the message with a user-provided transform function that returns an observable.</summary>
209         /// <param name="function">The transform function to use to process the message.</param>
210         /// <param name="messageWithId">The message to be processed.</param>
211         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
212         private void ProcessMessageWithTask(Func<TInput, Task<IEnumerable<TOutput>>> function, KeyValuePair<TInput, long> messageWithId)
213         {
214             Contract.Requires(function != null, "Function to invoke is required.");
215
216             // Run the transform function to get the resulting task
217             Task<IEnumerable<TOutput>> task = null;
218             Exception caughtException = null;
219             try
220             {
221                 task = function(messageWithId.Key);
222             }
223             catch (Exception exc) { caughtException = exc; }
224
225             // If no task is available, either because null was returned or an exception was thrown, we're done.
226             if (task == null)
227             {
228                 // If we didn't get a task because an exception occurred, store it 
229                 // (or if the exception was cancellation, just ignore it).
230                 if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
231                 {
232                     Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
233                     _target.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
234                 }
235
236                 // Notify that we're done with this input and that we got no output for the input.
237                 if (_reorderingBuffer != null)
238                 {
239                     // If there's a reordering buffer, "store" an empty output.  This will
240                     // internally both update the output buffer and decrement the bounding count
241                     // accordingly.
242                     StoreOutputItems(messageWithId, null);
243                     _target.SignalOneAsyncMessageCompleted();
244                 }
245                 else
246                 {
247                     // As a fast path if we're not reordering, decrement the bounding
248                     // count as part of our signaling that we're done, since this will 
249                     // internally take the lock only once, whereas the above path will
250                     // take the lock twice.
251                     _target.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
252                 }
253                 return;
254             }
255
256             // We got back a task.  Now wait for it to complete and store its results.
257             // Unlike with TransformBlock and ActionBlock, We run the continuation on the user-provided 
258             // scheduler as we'll be running user code through enumerating the returned enumerable.
259             task.ContinueWith((completed, state) =>
260             {
261                 var tuple = (Tuple<TransformManyBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state;
262                 tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
263             }, Tuple.Create(this, messageWithId),
264             CancellationToken.None,
265             Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously),
266             _source.DataflowBlockOptions.TaskScheduler);
267         }
268
269         /// <summary>Completes the processing of an asynchronous message.</summary>
270         /// <param name="completed">The completed task storing the output data generated for an input message.</param>
271         /// <param name="messageWithId">The originating message</param>
272         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
273         private void AsyncCompleteProcessMessageWithTask(
274             Task<IEnumerable<TOutput>> completed, KeyValuePair<TInput, long> messageWithId)
275         {
276             Contract.Requires(completed != null, "A task should have been provided.");
277             Contract.Requires(completed.IsCompleted, "The task should have been in a final state.");
278
279             switch (completed.Status)
280             {
281                 case TaskStatus.RanToCompletion:
282                     IEnumerable<TOutput> outputItems = completed.Result;
283                     try
284                     {
285                         // Get the resulting enumerable and persist it.
286                         StoreOutputItems(messageWithId, outputItems);
287                     }
288                     catch (Exception exc)
289                     {
290                         // Enumerating the user's collection failed. If this exception represents cancellation, 
291                         // swallow it rather than shutting down the block.
292                         if (!Common.IsCooperativeCancellation(exc))
293                         {
294                             // The exception was not for cancellation. We must add the exception before declining 
295                             // and signaling completion, as the exception is part of the operation, and the completion 
296                             // conditions depend on this.
297                             Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key);
298                             _target.Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
299                         }
300                     }
301                     break;
302
303                 case TaskStatus.Faulted:
304                     // We must add the exception before declining and signaling completion, as the exception 
305                     // is part of the operation, and the completion conditions depend on this.
306                     AggregateException aggregate = completed.Exception;
307                     Common.StoreDataflowMessageValueIntoExceptionData(aggregate, messageWithId.Key, targetInnerExceptions: true);
308                     _target.Complete(aggregate, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
309                     goto case TaskStatus.Canceled;
310                 case TaskStatus.Canceled:
311                     StoreOutputItems(messageWithId, null); // notify the reordering buffer and decrement the bounding count
312                     break;
313
314                 default:
315                     Debug.Assert(false, "The task should have been in a final state.");
316                     break;
317             }
318
319             // Let the target know that one of the asynchronous operations it launched has completed.
320             _target.SignalOneAsyncMessageCompleted();
321         }
322
323         /// <summary>
324         /// Stores the output items, either into the reordering buffer or into the source half.
325         /// Ensures that the bounding count is correctly updated.
326         /// </summary>
327         /// <param name="messageWithId">The message with id.</param>
328         /// <param name="outputItems">The output items to be persisted.</param>
329         private void StoreOutputItems(
330             KeyValuePair<TInput, long> messageWithId, IEnumerable<TOutput> outputItems)
331         {
332             // If there's a reordering buffer, pass the data along to it.
333             // The reordering buffer will handle all details, including bounding.
334             if (_reorderingBuffer != null)
335             {
336                 StoreOutputItemsReordered(messageWithId.Value, outputItems);
337             }
338             // Otherwise, output the data directly.
339             else if (outputItems != null)
340             {
341                 // If this is a trusted type, output the data en mass.
342                 if (outputItems is TOutput[] || outputItems is List<TOutput>)
343                 {
344                     StoreOutputItemsNonReorderedAtomic(outputItems);
345                 }
346                 else
347                 {
348                     // Otherwise, we need to take the slow path of enumerating
349                     // each individual item.
350                     StoreOutputItemsNonReorderedWithIteration(outputItems);
351                 }
352             }
353             else if (_target.IsBounded)
354             {
355                 // outputItems is null and there's no reordering buffer
356                 // and we're bounding, so decrement the bounding count to
357                 // signify that the input element we already accounted for
358                 // produced no output
359                 _target.ChangeBoundingCount(count: -1);
360             }
361             // else there's no reordering buffer, there are no output items, and we're not bounded,
362             // so there's nothing more to be done.
363         }
364
365         /// <summary>Stores the next item using the reordering buffer.</summary>
366         /// <param name="id">The ID of the item.</param>
367         /// <param name="item">The completed item.</param>
368         private void StoreOutputItemsReordered(long id, IEnumerable<TOutput> item)
369         {
370             Contract.Requires(_reorderingBuffer != null, "Expected a reordering buffer");
371             Contract.Requires(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
372
373             // Grab info about the transform
374             TargetCore<TInput> target = _target;
375             bool isBounded = target.IsBounded;
376
377             // Handle invalid items (null enumerables) by delegating to the base
378             if (item == null)
379             {
380                 _reorderingBuffer.AddItem(id, null, false);
381                 if (isBounded) target.ChangeBoundingCount(count: -1);
382                 return;
383             }
384
385             // If we can eagerly get the number of items in the collection, update the bounding count.
386             // This avoids the cost of updating it once per output item (since each update requires synchronization).
387             // Even if we're not bounding, we still want to determine whether the item is trusted so that we 
388             // can immediately dump it out once we take the lock if we're the next item.
389             IList<TOutput> itemAsTrustedList = item as TOutput[];
390             if (itemAsTrustedList == null) itemAsTrustedList = item as List<TOutput>;
391             if (itemAsTrustedList != null && isBounded)
392             {
393                 UpdateBoundingCountWithOutputCount(count: itemAsTrustedList.Count);
394             }
395
396             // Determine whether this id is the next item, and if it is and if we have a trusted list,
397             // try to output it immediately on the fast path.  If it can be output, we're done.
398             // Otherwise, make forward progress based on whether we're next in line.
399             bool? isNextNullable = _reorderingBuffer.AddItemIfNextAndTrusted(id, itemAsTrustedList, itemAsTrustedList != null);
400             if (!isNextNullable.HasValue) return; // data was successfully output
401             bool isNextItem = isNextNullable.Value;
402
403             // By this point, either we're not the next item, in which case we need to make a copy of the
404             // data and store it, or we are the next item and can store it immediately but we need to enumerate
405             // the items and store them individually because we don't want to enumerate while holding a lock.
406             List<TOutput> itemCopy = null;
407             try
408             {
409                 // If this is the next item, we can output it now.
410                 if (isNextItem)
411                 {
412                     StoreOutputItemsNonReorderedWithIteration(item);
413                     // here itemCopy remains null, so that base.AddItem will finish our interactions with the reordering buffer
414                 }
415                 else if (itemAsTrustedList != null)
416                 {
417                     itemCopy = itemAsTrustedList.ToList();
418                     // we already got the count and updated the bounding count previously
419                 }
420                 else
421                 {
422                     // We're not the next item, and we're not trusted, so copy the data into a list.
423                     // We need to enumerate outside of the lock in the base class.
424                     int itemCount = 0;
425                     try
426                     {
427                         itemCopy = item.ToList(); // itemCopy will remain null in the case of exception
428                         itemCount = itemCopy.Count;
429                     }
430                     finally
431                     {
432                         // If we're here successfully, then itemCount is the number of output items
433                         // we actually received, and we should update the bounding count with it.
434                         // If we're here because ToList threw an exception, then itemCount will be 0,
435                         // and we still need to update the bounding count with this in order to counteract
436                         // the increased bounding count for the corresponding input.
437                         if (isBounded) UpdateBoundingCountWithOutputCount(count: itemCount);
438                     }
439                 }
440                 // else if the item isn't valid, the finally block will see itemCopy as null and output invalid
441             }
442             finally
443             {
444                 // Tell the base reordering buffer that we're done.  If we already output
445                 // all of the data, itemCopy will be null, and we just pass down the invalid item.  
446                 // If we haven't, pass down the real thing.  We do this even in the case of an exception,
447                 // in which case this will be a dummy element.
448                 _reorderingBuffer.AddItem(id, itemCopy, itemIsValid: itemCopy != null);
449             }
450         }
451
452         /// <summary>
453         /// Stores the trusted enumerable en mass into the source core.
454         /// This method does not go through the reordering buffer.
455         /// </summary>
456         /// <param name="outputItems"></param>
457         private void StoreOutputItemsNonReorderedAtomic(IEnumerable<TOutput> outputItems)
458         {
459             Contract.Requires(_reorderingBuffer == null, "Expected not to have a reordering buffer");
460             Contract.Requires(outputItems is TOutput[] || outputItems is List<TOutput>, "outputItems must be a list we've already vetted as trusted");
461             if (_target.IsBounded) UpdateBoundingCountWithOutputCount(count: ((ICollection<TOutput>)outputItems).Count);
462             _source.AddMessages(outputItems);
463         }
464
465         /// <summary>
466         /// Stores the untrusted enumerable into the source core.
467         /// This method does not go through the reordering buffer.
468         /// </summary>
469         /// <param name="outputItems">The untrusted enumerable.</param>
470         private void StoreOutputItemsNonReorderedWithIteration(IEnumerable<TOutput> outputItems)
471         {
472             // If we're bounding, we need to increment the bounded count
473             // for each individual item as we enumerate it.
474             if (_target.IsBounded)
475             {
476                 // When the input item that generated this
477                 // output was loaded, we incremented the bounding count.  If it only
478                 // output a single a item, then we don't need to touch the bounding count.
479                 // Otherwise, we need to adjust the bounding count accordingly.
480                 bool outputFirstItem = false;
481                 try
482                 {
483                     foreach (TOutput item in outputItems)
484                     {
485                         if (outputFirstItem) _target.ChangeBoundingCount(count: 1);
486                         else outputFirstItem = true;
487                         _source.AddMessage(item);
488                     }
489                 }
490                 finally
491                 {
492                     if (!outputFirstItem) _target.ChangeBoundingCount(count: -1);
493                 }
494             }
495             // If we're not bounding, just output each individual item.
496             else
497             {
498                 foreach (TOutput item in outputItems) _source.AddMessage(item);
499             }
500         }
501
502         /// <summary>
503         /// Updates the bounding count based on the number of output items
504         /// generated for a single input.
505         /// </summary>
506         /// <param name="count">The number of output items.</param>
507         private void UpdateBoundingCountWithOutputCount(int count)
508         {
509             // We already incremented the count for a single input item, and
510             // that input spawned 0 or more outputs.  Take the input tracking
511             // into account when figuring out how much to increment or decrement
512             // the bounding count.
513
514             Contract.Requires(_target.IsBounded, "Expected to be in bounding mode.");
515             if (count > 1) _target.ChangeBoundingCount(count - 1);
516             else if (count == 0) _target.ChangeBoundingCount(-1);
517             else Debug.Assert(count == 1, "Count shouldn't be negative.");
518         }
519
520         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
521         public void Complete() { _target.Complete(exception: null, dropPendingMessages: false); }
522
523         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
524         void IDataflowBlock.Fault(Exception exception)
525         {
526             if (exception == null) throw new ArgumentNullException("exception");
527             Contract.EndContractBlock();
528
529             _target.Complete(exception, dropPendingMessages: true);
530         }
531
532         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
533         public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
534
535         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
536         public Boolean TryReceive(Predicate<TOutput> filter, out TOutput item) { return _source.TryReceive(filter, out item); }
537
538         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
539         public bool TryReceiveAll(out IList<TOutput> items) { return _source.TryReceiveAll(out items); }
540
541         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
542         public Task Completion { get { return _source.Completion; } }
543
544         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
545         public int InputCount { get { return _target.InputCount; } }
546
547         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
548         public int OutputCount { get { return _source.OutputCount; } }
549
550         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
551         DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
552         {
553             return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
554         }
555
556         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
557         TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
558         {
559             return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
560         }
561
562         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
563         bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
564         {
565             return _source.ReserveMessage(messageHeader, target);
566         }
567
568         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
569         void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
570         {
571             _source.ReleaseReservation(messageHeader, target);
572         }
573
574         /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
575         private int InputCountForDebugger { get { return _target.GetDebuggingInformation().InputCount; } }
576         /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
577         private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
578
579         /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
580         public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
581
582         /// <summary>The data to display in the debugger display attribute.</summary>
583         [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
584         private object DebuggerDisplayContent
585         {
586             get
587             {
588                 return string.Format("{0}, InputCount={1}, OutputCount={2}",
589                     Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
590                     InputCountForDebugger,
591                     OutputCountForDebugger);
592             }
593         }
594         /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
595         object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
596
597         /// <summary>Provides a debugger type proxy for the TransformManyBlock.</summary>
598         private sealed class DebugView
599         {
600             /// <summary>The transform many block being viewed.</summary>
601             private readonly TransformManyBlock<TInput, TOutput> _transformManyBlock;
602             /// <summary>The target half of the block being viewed.</summary>
603             private readonly TargetCore<TInput>.DebuggingInformation _targetDebuggingInformation;
604             /// <summary>The source half of the block being viewed.</summary>
605             private readonly SourceCore<TOutput>.DebuggingInformation _sourceDebuggingInformation;
606
607             /// <summary>Initializes the debug view.</summary>
608             /// <param name="transformManyBlock">The transform being viewed.</param>
609             public DebugView(TransformManyBlock<TInput, TOutput> transformManyBlock)
610             {
611                 Contract.Requires(transformManyBlock != null, "Need a block with which to construct the debug view.");
612                 _transformManyBlock = transformManyBlock;
613                 _targetDebuggingInformation = transformManyBlock._target.GetDebuggingInformation();
614                 _sourceDebuggingInformation = transformManyBlock._source.GetDebuggingInformation();
615             }
616
617             /// <summary>Gets the messages waiting to be processed.</summary>
618             public IEnumerable<TInput> InputQueue { get { return _targetDebuggingInformation.InputQueue; } }
619             /// <summary>Gets any postponed messages.</summary>
620             public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } }
621             /// <summary>Gets the messages waiting to be received.</summary>
622             public IEnumerable<TOutput> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
623
624             /// <summary>Gets the number of input operations currently in flight.</summary>
625             public Int32 CurrentDegreeOfParallelism { get { return _targetDebuggingInformation.CurrentDegreeOfParallelism; } }
626             /// <summary>Gets the task being used for output processing.</summary>
627             public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
628
629             /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
630             public ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } }
631             /// <summary>Gets whether the block is declining further messages.</summary>
632             public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } }
633             /// <summary>Gets whether the block is completed.</summary>
634             public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
635             /// <summary>Gets the block's Id.</summary>
636             public int Id { get { return Common.GetBlockId(_transformManyBlock); } }
637
638             /// <summary>Gets the set of all targets linked from this block.</summary>
639             public TargetRegistry<TOutput> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
640             /// <summary>Gets the set of all targets linked from this block.</summary>
641             public ITargetBlock<TOutput> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
642         }
643     }
644 }