Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // SelectManyQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// SelectMany is effectively a nested loops join. It is given two data sources, an
22     /// outer and an inner -- actually, the inner is sometimes calculated by invoking a
23     /// function for each outer element -- and we walk the outer, walking the entire
24     /// inner enumerator for each outer element. There is an optional result selector
25     /// function which can transform the output before yielding it as a result element.
26     ///
27     /// Notes:
28     ///     Although select many takes two enumerable objects as input, it appears to the
29     ///     query analysis infrastructure as a unary operator. That's because it works a
30     ///     little differently than the other binary operators: it has to re-open the right
31     ///     child every time an outer element is walked. The right child is NOT partitioned. 
32     /// </summary>
33     /// <typeparam name="TLeftInput"></typeparam>
34     /// <typeparam name="TRightInput"></typeparam>
35     /// <typeparam name="TOutput"></typeparam>
36     internal sealed class SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> : UnaryQueryOperator<TLeftInput, TOutput>
37     {
38
39         private readonly Func<TLeftInput, IEnumerable<TRightInput>> m_rightChildSelector; // To select a new child each iteration.
40         private readonly Func<TLeftInput, int, IEnumerable<TRightInput>> m_indexedRightChildSelector; // To select a new child each iteration.
41         private readonly Func<TLeftInput, TRightInput, TOutput> m_resultSelector; // An optional result selection function.
42         private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
43         private bool m_limitsParallelism = false; // Whether to prematurely merge the input of this operator.
44
45         //---------------------------------------------------------------------------------------
46         // Initializes a new select-many operator.
47         //
48         // Arguments:
49         //    leftChild             - the left data source from which to pull data.
50         //    rightChild            - the right data source from which to pull data.
51         //    rightChildSelector    - if no right data source was supplied, the selector function
52         //                            will generate a new right child for every unique left element.
53         //    resultSelector        - a selection function for creating output elements.
54         //
55
56         internal SelectManyQueryOperator(IEnumerable<TLeftInput> leftChild,
57                                          Func<TLeftInput, IEnumerable<TRightInput>> rightChildSelector,
58                                          Func<TLeftInput, int, IEnumerable<TRightInput>> indexedRightChildSelector,
59                                          Func<TLeftInput, TRightInput, TOutput> resultSelector)
60             :base(leftChild)
61         {
62             Contract.Assert(leftChild != null, "left child data source cannot be null");
63             Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null,
64                             "either right child data or selector must be supplied");
65             Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null,
66                             "either indexed- or non-indexed child selector must be supplied (not both)");
67             Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null,
68                             "right input and output must be the same types, otherwise the result selector may not be null");
69
70             m_rightChildSelector = rightChildSelector;
71             m_indexedRightChildSelector = indexedRightChildSelector;
72             m_resultSelector = resultSelector;
73
74             // If the SelectMany is indexed, elements must be returned in the order in which
75             // indices were assigned.
76             m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null;
77
78             InitOrderIndex();
79         }
80
81         private void InitOrderIndex()
82         {
83             OrdinalIndexState childIndexState = Child.OrdinalIndexState;
84
85             if (m_indexedRightChildSelector != null)
86             {
87                 // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them
88                 // into the user delegate.
89
90                 m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Correct);
91                 m_limitsParallelism = m_prematureMerge && childIndexState != OrdinalIndexState.Shuffled;
92             }
93             else
94             {
95                 if (OutputOrdered)
96                 {
97                     // If the output of this SelectMany is ordered, the input keys must be at least increasing. The
98                     // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys
99                     // are Shuffled, we need to merge prematurely.
100                     m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Increasing);
101                 }
102             }
103
104             SetOrdinalIndexState(OrdinalIndexState.Increasing);
105         }
106
107         internal override void WrapPartitionedStream<TLeftKey>(
108             PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, bool preferStriping, QuerySettings settings)
109         {
110             int partitionCount = inputStream.PartitionCount;
111
112             if (m_indexedRightChildSelector != null)
113             {
114                 PartitionedStream<TLeftInput, int> inputStreamInt;
115                 
116                 // If the index is not correct, we need to reindex.
117                 if (m_prematureMerge)
118                 {
119                     ListQueryResults<TLeftInput> listResults =
120                         QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings);
121                     inputStreamInt = listResults.GetPartitionedStream();
122                 }
123                 else
124                 {
125                     inputStreamInt = (PartitionedStream<TLeftInput, int>)(object)inputStream;
126                 }
127                 WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings);
128                 return;
129             }
130
131             //
132             // 
133             if (m_prematureMerge)
134             {
135                 PartitionedStream<TLeftInput, int> inputStreamInt =
136                     QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings)
137                     .GetPartitionedStream();
138                 WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings);
139             }
140             else
141             {
142                 WrapPartitionedStreamNotIndexed(inputStream, recipient, settings);
143             }
144         }
145
146         /// <summary>
147         /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with
148         /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, 
149         /// it will be the same type as "TLeftKey" in WrapPartitionedStream.)
150         /// </summary>
151         private void WrapPartitionedStreamNotIndexed<TLeftKey>(
152             PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
153         {
154             int partitionCount = inputStream.PartitionCount;
155             var keyComparer = new PairComparer<TLeftKey, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
156             var outputStream = new PartitionedStream<TOutput, Pair<TLeftKey, int>>(partitionCount, keyComparer, OrdinalIndexState);
157             for (int i = 0; i < partitionCount; i++)
158             {
159                 outputStream[i] = new SelectManyQueryOperatorEnumerator<TLeftKey>(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
160             }
161
162             recipient.Receive(outputStream);
163         }
164
165         /// <summary>
166         /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant
167         /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate)
168         /// </summary>
169         private void WrapPartitionedStreamIndexed(
170             PartitionedStream<TLeftInput, int> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
171         {
172             var keyComparer = new PairComparer<int, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
173
174             var outputStream = new PartitionedStream<TOutput, Pair<int, int>>(inputStream.PartitionCount, keyComparer, OrdinalIndexState);
175
176             for (int i = 0; i < inputStream.PartitionCount; i++)
177             {
178                 outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
179             }
180
181             recipient.Receive(outputStream);           
182         }
183
184         //---------------------------------------------------------------------------------------
185         // Just opens the current operator, including opening the left child and wrapping with a
186         // partition if needed. The right child is not opened yet -- this is always done on demand
187         // as the outer elements are enumerated.
188         //
189
190         internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
191         {
192             QueryResults<TLeftInput> childQueryResults = Child.Open(settings, preferStriping);
193             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
194         }
195
196         //---------------------------------------------------------------------------------------
197         // Returns an enumerable that represents the query executing sequentially.
198         //
199
200         internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
201         {
202             if (m_rightChildSelector != null)
203             {
204                 if (m_resultSelector != null)
205                 {
206                     return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector);
207                 }
208                 return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector));
209             }
210             else
211             {
212                 Contract.Assert(m_indexedRightChildSelector != null);
213                 if (m_resultSelector != null)
214                 {
215                     return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector);
216                 }
217
218                 return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector));
219             }
220         }
221
222
223         //---------------------------------------------------------------------------------------
224         // Whether this operator performs a premature merge that would not be performed in
225         // a similar sequential operation (i.e., in LINQ to Objects).
226         //
227
228         internal override bool LimitsParallelism
229         {
230             get { return m_limitsParallelism; }
231         }
232
233         //---------------------------------------------------------------------------------------
234         // The enumerator type responsible for executing the SelectMany logic.
235         //
236
237         class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator<TOutput, Pair<int, int>>
238         {
239             private readonly QueryOperatorEnumerator<TLeftInput, int> m_leftSource; // The left data source to enumerate.
240             private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
241             private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
242             private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
243             private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
244             private readonly CancellationToken m_cancellationToken;
245
246             private class Mutables
247             {
248                 internal int m_currentRightSourceIndex = -1; // The index for the right data source.
249                 internal TLeftInput m_currentLeftElement; // The current element in the left data source.
250                 internal int m_currentLeftSourceIndex; // The current key in the left data source.
251                 internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing. 
252             }
253
254
255             //---------------------------------------------------------------------------------------
256             // Instantiates a new select-many enumerator. Notice that the right data source is an
257             // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
258             // data source.
259             //
260
261             internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, int> leftSource,
262                                                               SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
263                 CancellationToken cancellationToken)
264             {
265                 Contract.Assert(leftSource != null);
266                 Contract.Assert(selectManyOperator != null);
267
268                 m_leftSource = leftSource;
269                 m_selectManyOperator = selectManyOperator;
270                 m_cancellationToken = cancellationToken;
271             }
272
273             //---------------------------------------------------------------------------------------
274             // Straightforward IEnumerator<T> methods.
275             //
276
277             internal override bool MoveNext(ref TOutput currentElement, ref Pair<int, int> currentKey)
278             {
279                 while (true)
280                 {
281                     if (m_currentRightSource == null)
282                     {
283                         m_mutables = new Mutables();
284
285                         // Check cancellation every few lhs-enumerations in case none of them are producing
286                         // any outputs.  Otherwise, we rely on the consumer of this operator to be performing the checks.
287                         if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
288                             CancellationState.ThrowIfCanceled(m_cancellationToken);
289
290                         // We don't have a "current" right enumerator to use. We have to fetch the next
291                         // one. If the left has run out of elements, however, we're done and just return
292                         // false right away.
293                         if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex))
294                         {
295                             return false;
296                         }
297
298                         // Use the source selection routine to create a right child.
299                         IEnumerable<TRightInput> rightChild =
300                             m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex);
301
302                         Contract.Assert(rightChild != null);
303                         m_currentRightSource = rightChild.GetEnumerator();
304
305                         Contract.Assert(m_currentRightSource != null);
306
307                         // If we have no result selector, we will need to access the Current element of the right
308                         // data source as though it is a TOutput. Unfortunately, we know that TRightInput must
309                         // equal TOutput (we check it during operator construction), but the type system doesn't.
310                         // Thus we would have to cast the result of invoking Current from type TRightInput to
311                         // TOutput. This is no good, since the results could be value types. Instead, we save the
312                         // enumerator object as an IEnumerator<TOutput> and access that later on.
313                         if (m_selectManyOperator.m_resultSelector == null)
314                         {
315                             m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
316                             Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
317                                             "these must be equal, otherwise the surrounding logic will be broken");
318                         }
319                     }
320
321                     if (m_currentRightSource.MoveNext())
322                     {
323                         m_mutables.m_currentRightSourceIndex++;
324
325                         // If the inner data source has an element, we can yield it.
326                         if (m_selectManyOperator.m_resultSelector != null)
327                         {
328                             // In the case of a selection function, use that to yield the next element.
329                             currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
330                         }
331                         else
332                         {
333                             // Otherwise, the right input and output types must be the same. We use the
334                             // casted copy of the current right source and just return its current element.
335                             Contract.Assert(m_currentRightSourceAsOutput != null);
336                             currentElement = m_currentRightSourceAsOutput.Current;
337                         }
338                         currentKey = new Pair<int, int>(m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex);
339
340                         return true;
341                     }
342                     else
343                     {
344                         // Otherwise, we have exhausted the right data source. Loop back around and try
345                         // to get the next left element, then its right, and so on.
346                         m_currentRightSource.Dispose();
347                         m_currentRightSource = null;
348                         m_currentRightSourceAsOutput = null;
349                     }
350                 }
351             }
352
353             protected override void Dispose(bool disposing)
354             {
355                 m_leftSource.Dispose();
356                 if (m_currentRightSource != null)
357                 {
358                     m_currentRightSource.Dispose();
359                 }
360             }
361         }
362
363         //---------------------------------------------------------------------------------------
364         // The enumerator type responsible for executing the SelectMany logic.
365         //
366
367         class SelectManyQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TOutput, Pair<TLeftKey, int>>
368         {
369             private readonly QueryOperatorEnumerator<TLeftInput, TLeftKey> m_leftSource; // The left data source to enumerate.
370             private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
371             private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
372             private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
373             private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
374             private readonly CancellationToken m_cancellationToken;
375
376             private class Mutables
377             {
378                 internal int m_currentRightSourceIndex = -1; // The index for the right data source.
379                 internal TLeftInput m_currentLeftElement; // The current element in the left data source.
380                 internal TLeftKey m_currentLeftKey; // The current key in the left data source.
381                 internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing. 
382             }
383
384
385             //---------------------------------------------------------------------------------------
386             // Instantiates a new select-many enumerator. Notice that the right data source is an
387             // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
388             // data source.
389             //
390
391             internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, TLeftKey> leftSource,
392                                                        SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
393                                                        CancellationToken cancellationToken)
394             {
395                 Contract.Assert(leftSource != null);
396                 Contract.Assert(selectManyOperator != null);
397
398                 m_leftSource = leftSource;
399                 m_selectManyOperator = selectManyOperator;
400                 m_cancellationToken = cancellationToken;
401             }
402
403             //---------------------------------------------------------------------------------------
404             // Straightforward IEnumerator<T> methods.
405             //
406
407             internal override bool MoveNext(ref TOutput currentElement, ref Pair<TLeftKey, int> currentKey)
408             {
409                 while (true)
410                 {
411                     if (m_currentRightSource == null)
412                     {
413                         m_mutables = new Mutables();
414                         
415                         // Check cancellation every few lhs-enumerations in case none of them are producing
416                         // any outputs.  Otherwise, we rely on the consumer of this operator to be performing the checks.
417                         if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
418                             CancellationState.ThrowIfCanceled(m_cancellationToken);
419
420                         // We don't have a "current" right enumerator to use. We have to fetch the next
421                         // one. If the left has run out of elements, however, we're done and just return
422                         // false right away.
423
424                         if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey))
425                         {
426                             return false;
427                         }
428
429                         // Use the source selection routine to create a right child.
430                         IEnumerable<TRightInput> rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement);
431
432                         Contract.Assert(rightChild != null);
433                         m_currentRightSource = rightChild.GetEnumerator();
434
435                         Contract.Assert(m_currentRightSource != null);
436
437                         // If we have no result selector, we will need to access the Current element of the right
438                         // data source as though it is a TOutput. Unfortunately, we know that TRightInput must
439                         // equal TOutput (we check it during operator construction), but the type system doesn't.
440                         // Thus we would have to cast the result of invoking Current from type TRightInput to
441                         // TOutput. This is no good, since the results could be value types. Instead, we save the
442                         // enumerator object as an IEnumerator<TOutput> and access that later on.
443                         if (m_selectManyOperator.m_resultSelector == null)
444                         {
445                             m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
446                             Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
447                                             "these must be equal, otherwise the surrounding logic will be broken");
448                         }
449                     }
450
451                     if (m_currentRightSource.MoveNext())
452                     {
453                         m_mutables.m_currentRightSourceIndex++;
454
455                         // If the inner data source has an element, we can yield it.
456                         if (m_selectManyOperator.m_resultSelector != null)
457                         {
458                             // In the case of a selection function, use that to yield the next element.
459                             currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
460                         }
461                         else
462                         {
463                             // Otherwise, the right input and output types must be the same. We use the
464                             // casted copy of the current right source and just return its current element.
465                             Contract.Assert(m_currentRightSourceAsOutput != null);
466                             currentElement = m_currentRightSourceAsOutput.Current;
467                         }
468                         currentKey = new Pair<TLeftKey, int>(m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex);
469
470                         return true;
471                     }
472                     else
473                     {
474                         // Otherwise, we have exhausted the right data source. Loop back around and try
475                         // to get the next left element, then its right, and so on.
476                         m_currentRightSource.Dispose();
477                         m_currentRightSource = null;
478                         m_currentRightSourceAsOutput = null;
479                     }
480                 }
481             }
482
483             protected override void Dispose(bool disposing)
484             {
485                 m_leftSource.Dispose();
486                 if (m_currentRightSource != null)
487                 {
488                     m_currentRightSource.Dispose();
489                 }
490             }
491         }
492     }
493 }