3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // SelectManyQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// SelectMany is effectively a nested loops join. It is given two data sources, an
22 /// outer and an inner -- actually, the inner is sometimes calculated by invoking a
23 /// function for each outer element -- and we walk the outer, walking the entire
24 /// inner enumerator for each outer element. There is an optional result selector
25 /// function which can transform the output before yielding it as a result element.
28 /// Although select many takes two enumerable objects as input, it appears to the
29 /// query analysis infrastructure as a unary operator. That's because it works a
30 /// little differently than the other binary operators: it has to re-open the right
31 /// child every time an outer element is walked. The right child is NOT partitioned.
33 /// <typeparam name="TLeftInput"></typeparam>
34 /// <typeparam name="TRightInput"></typeparam>
35 /// <typeparam name="TOutput"></typeparam>
36 internal sealed class SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> : UnaryQueryOperator<TLeftInput, TOutput>
39 private readonly Func<TLeftInput, IEnumerable<TRightInput>> m_rightChildSelector; // To select a new child each iteration.
40 private readonly Func<TLeftInput, int, IEnumerable<TRightInput>> m_indexedRightChildSelector; // To select a new child each iteration.
41 private readonly Func<TLeftInput, TRightInput, TOutput> m_resultSelector; // An optional result selection function.
42 private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
43 private bool m_limitsParallelism = false; // Whether to prematurely merge the input of this operator.
45 //---------------------------------------------------------------------------------------
46 // Initializes a new select-many operator.
49 // leftChild - the left data source from which to pull data.
50 // rightChild - the right data source from which to pull data.
51 // rightChildSelector - if no right data source was supplied, the selector function
52 // will generate a new right child for every unique left element.
53 // resultSelector - a selection function for creating output elements.
56 internal SelectManyQueryOperator(IEnumerable<TLeftInput> leftChild,
57 Func<TLeftInput, IEnumerable<TRightInput>> rightChildSelector,
58 Func<TLeftInput, int, IEnumerable<TRightInput>> indexedRightChildSelector,
59 Func<TLeftInput, TRightInput, TOutput> resultSelector)
62 Contract.Assert(leftChild != null, "left child data source cannot be null");
63 Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null,
64 "either right child data or selector must be supplied");
65 Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null,
66 "either indexed- or non-indexed child selector must be supplied (not both)");
67 Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null,
68 "right input and output must be the same types, otherwise the result selector may not be null");
70 m_rightChildSelector = rightChildSelector;
71 m_indexedRightChildSelector = indexedRightChildSelector;
72 m_resultSelector = resultSelector;
74 // If the SelectMany is indexed, elements must be returned in the order in which
75 // indices were assigned.
76 m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null;
81 private void InitOrderIndex()
83 OrdinalIndexState childIndexState = Child.OrdinalIndexState;
85 if (m_indexedRightChildSelector != null)
87 // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them
88 // into the user delegate.
90 m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Correct);
91 m_limitsParallelism = m_prematureMerge && childIndexState != OrdinalIndexState.Shuffled;
97 // If the output of this SelectMany is ordered, the input keys must be at least increasing. The
98 // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys
99 // are Shuffled, we need to merge prematurely.
100 m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Increasing);
104 SetOrdinalIndexState(OrdinalIndexState.Increasing);
107 internal override void WrapPartitionedStream<TLeftKey>(
108 PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, bool preferStriping, QuerySettings settings)
110 int partitionCount = inputStream.PartitionCount;
112 if (m_indexedRightChildSelector != null)
114 PartitionedStream<TLeftInput, int> inputStreamInt;
116 // If the index is not correct, we need to reindex.
117 if (m_prematureMerge)
119 ListQueryResults<TLeftInput> listResults =
120 QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings);
121 inputStreamInt = listResults.GetPartitionedStream();
125 inputStreamInt = (PartitionedStream<TLeftInput, int>)(object)inputStream;
127 WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings);
133 if (m_prematureMerge)
135 PartitionedStream<TLeftInput, int> inputStreamInt =
136 QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings)
137 .GetPartitionedStream();
138 WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings);
142 WrapPartitionedStreamNotIndexed(inputStream, recipient, settings);
147 /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with
148 /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise,
149 /// it will be the same type as "TLeftKey" in WrapPartitionedStream.)
151 private void WrapPartitionedStreamNotIndexed<TLeftKey>(
152 PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
154 int partitionCount = inputStream.PartitionCount;
155 var keyComparer = new PairComparer<TLeftKey, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
156 var outputStream = new PartitionedStream<TOutput, Pair<TLeftKey, int>>(partitionCount, keyComparer, OrdinalIndexState);
157 for (int i = 0; i < partitionCount; i++)
159 outputStream[i] = new SelectManyQueryOperatorEnumerator<TLeftKey>(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
162 recipient.Receive(outputStream);
166 /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant
167 /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate)
169 private void WrapPartitionedStreamIndexed(
170 PartitionedStream<TLeftInput, int> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
172 var keyComparer = new PairComparer<int, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
174 var outputStream = new PartitionedStream<TOutput, Pair<int, int>>(inputStream.PartitionCount, keyComparer, OrdinalIndexState);
176 for (int i = 0; i < inputStream.PartitionCount; i++)
178 outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
181 recipient.Receive(outputStream);
184 //---------------------------------------------------------------------------------------
185 // Just opens the current operator, including opening the left child and wrapping with a
186 // partition if needed. The right child is not opened yet -- this is always done on demand
187 // as the outer elements are enumerated.
190 internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
192 QueryResults<TLeftInput> childQueryResults = Child.Open(settings, preferStriping);
193 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
196 //---------------------------------------------------------------------------------------
197 // Returns an enumerable that represents the query executing sequentially.
200 internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
202 if (m_rightChildSelector != null)
204 if (m_resultSelector != null)
206 return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector);
208 return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector));
212 Contract.Assert(m_indexedRightChildSelector != null);
213 if (m_resultSelector != null)
215 return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector);
218 return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector));
223 //---------------------------------------------------------------------------------------
224 // Whether this operator performs a premature merge that would not be performed in
225 // a similar sequential operation (i.e., in LINQ to Objects).
228 internal override bool LimitsParallelism
230 get { return m_limitsParallelism; }
233 //---------------------------------------------------------------------------------------
234 // The enumerator type responsible for executing the SelectMany logic.
237 class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator<TOutput, Pair<int, int>>
239 private readonly QueryOperatorEnumerator<TLeftInput, int> m_leftSource; // The left data source to enumerate.
240 private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
241 private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
242 private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
243 private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
244 private readonly CancellationToken m_cancellationToken;
246 private class Mutables
248 internal int m_currentRightSourceIndex = -1; // The index for the right data source.
249 internal TLeftInput m_currentLeftElement; // The current element in the left data source.
250 internal int m_currentLeftSourceIndex; // The current key in the left data source.
251 internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing.
255 //---------------------------------------------------------------------------------------
256 // Instantiates a new select-many enumerator. Notice that the right data source is an
257 // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
261 internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, int> leftSource,
262 SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
263 CancellationToken cancellationToken)
265 Contract.Assert(leftSource != null);
266 Contract.Assert(selectManyOperator != null);
268 m_leftSource = leftSource;
269 m_selectManyOperator = selectManyOperator;
270 m_cancellationToken = cancellationToken;
273 //---------------------------------------------------------------------------------------
274 // Straightforward IEnumerator<T> methods.
277 internal override bool MoveNext(ref TOutput currentElement, ref Pair<int, int> currentKey)
281 if (m_currentRightSource == null)
283 m_mutables = new Mutables();
285 // Check cancellation every few lhs-enumerations in case none of them are producing
286 // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
287 if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
288 CancellationState.ThrowIfCanceled(m_cancellationToken);
290 // We don't have a "current" right enumerator to use. We have to fetch the next
291 // one. If the left has run out of elements, however, we're done and just return
293 if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex))
298 // Use the source selection routine to create a right child.
299 IEnumerable<TRightInput> rightChild =
300 m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex);
302 Contract.Assert(rightChild != null);
303 m_currentRightSource = rightChild.GetEnumerator();
305 Contract.Assert(m_currentRightSource != null);
307 // If we have no result selector, we will need to access the Current element of the right
308 // data source as though it is a TOutput. Unfortunately, we know that TRightInput must
309 // equal TOutput (we check it during operator construction), but the type system doesn't.
310 // Thus we would have to cast the result of invoking Current from type TRightInput to
311 // TOutput. This is no good, since the results could be value types. Instead, we save the
312 // enumerator object as an IEnumerator<TOutput> and access that later on.
313 if (m_selectManyOperator.m_resultSelector == null)
315 m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
316 Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
317 "these must be equal, otherwise the surrounding logic will be broken");
321 if (m_currentRightSource.MoveNext())
323 m_mutables.m_currentRightSourceIndex++;
325 // If the inner data source has an element, we can yield it.
326 if (m_selectManyOperator.m_resultSelector != null)
328 // In the case of a selection function, use that to yield the next element.
329 currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
333 // Otherwise, the right input and output types must be the same. We use the
334 // casted copy of the current right source and just return its current element.
335 Contract.Assert(m_currentRightSourceAsOutput != null);
336 currentElement = m_currentRightSourceAsOutput.Current;
338 currentKey = new Pair<int, int>(m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex);
344 // Otherwise, we have exhausted the right data source. Loop back around and try
345 // to get the next left element, then its right, and so on.
346 m_currentRightSource.Dispose();
347 m_currentRightSource = null;
348 m_currentRightSourceAsOutput = null;
353 protected override void Dispose(bool disposing)
355 m_leftSource.Dispose();
356 if (m_currentRightSource != null)
358 m_currentRightSource.Dispose();
363 //---------------------------------------------------------------------------------------
364 // The enumerator type responsible for executing the SelectMany logic.
367 class SelectManyQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TOutput, Pair<TLeftKey, int>>
369 private readonly QueryOperatorEnumerator<TLeftInput, TLeftKey> m_leftSource; // The left data source to enumerate.
370 private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
371 private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
372 private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
373 private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
374 private readonly CancellationToken m_cancellationToken;
376 private class Mutables
378 internal int m_currentRightSourceIndex = -1; // The index for the right data source.
379 internal TLeftInput m_currentLeftElement; // The current element in the left data source.
380 internal TLeftKey m_currentLeftKey; // The current key in the left data source.
381 internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing.
385 //---------------------------------------------------------------------------------------
386 // Instantiates a new select-many enumerator. Notice that the right data source is an
387 // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
391 internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, TLeftKey> leftSource,
392 SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
393 CancellationToken cancellationToken)
395 Contract.Assert(leftSource != null);
396 Contract.Assert(selectManyOperator != null);
398 m_leftSource = leftSource;
399 m_selectManyOperator = selectManyOperator;
400 m_cancellationToken = cancellationToken;
403 //---------------------------------------------------------------------------------------
404 // Straightforward IEnumerator<T> methods.
407 internal override bool MoveNext(ref TOutput currentElement, ref Pair<TLeftKey, int> currentKey)
411 if (m_currentRightSource == null)
413 m_mutables = new Mutables();
415 // Check cancellation every few lhs-enumerations in case none of them are producing
416 // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
417 if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
418 CancellationState.ThrowIfCanceled(m_cancellationToken);
420 // We don't have a "current" right enumerator to use. We have to fetch the next
421 // one. If the left has run out of elements, however, we're done and just return
424 if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey))
429 // Use the source selection routine to create a right child.
430 IEnumerable<TRightInput> rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement);
432 Contract.Assert(rightChild != null);
433 m_currentRightSource = rightChild.GetEnumerator();
435 Contract.Assert(m_currentRightSource != null);
437 // If we have no result selector, we will need to access the Current element of the right
438 // data source as though it is a TOutput. Unfortunately, we know that TRightInput must
439 // equal TOutput (we check it during operator construction), but the type system doesn't.
440 // Thus we would have to cast the result of invoking Current from type TRightInput to
441 // TOutput. This is no good, since the results could be value types. Instead, we save the
442 // enumerator object as an IEnumerator<TOutput> and access that later on.
443 if (m_selectManyOperator.m_resultSelector == null)
445 m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
446 Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
447 "these must be equal, otherwise the surrounding logic will be broken");
451 if (m_currentRightSource.MoveNext())
453 m_mutables.m_currentRightSourceIndex++;
455 // If the inner data source has an element, we can yield it.
456 if (m_selectManyOperator.m_resultSelector != null)
458 // In the case of a selection function, use that to yield the next element.
459 currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
463 // Otherwise, the right input and output types must be the same. We use the
464 // casted copy of the current right source and just return its current element.
465 Contract.Assert(m_currentRightSourceAsOutput != null);
466 currentElement = m_currentRightSourceAsOutput.Current;
468 currentKey = new Pair<TLeftKey, int>(m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex);
474 // Otherwise, we have exhausted the right data source. Loop back around and try
475 // to get the next left element, then its right, and so on.
476 m_currentRightSource.Dispose();
477 m_currentRightSource = null;
478 m_currentRightSourceAsOutput = null;
483 protected override void Dispose(bool disposing)
485 m_leftSource.Dispose();
486 if (m_currentRightSource != null)
488 m_currentRightSource.Dispose();