3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ConcatQueryOperator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// Concatenates one data source with another. Order preservation is used to ensure
22 /// the output is actually a concatenation -- i.e. one after the other. The only
23 /// special synchronization required is to find the largest index N in the first data
24 /// source so that the indices of elements in the second data source can be offset
25 /// by adding N+1. This makes it appear to the order preservation infrastructure as
26 /// though all elements in the second came after all elements in the first, which is
27 /// precisely what we want.
29 /// <typeparam name="TSource"></typeparam>
30 internal sealed class ConcatQueryOperator<TSource> : BinaryQueryOperator<TSource, TSource, TSource>
33 private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
34 private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source
36 //---------------------------------------------------------------------------------------
37 // Initializes a new concatenation operator.
40 // child - the child whose data we will reverse
43 internal ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild)
44 : base(firstChild, secondChild)
46 Contract.Assert(firstChild != null, "first child data source cannot be null");
47 Contract.Assert(secondChild != null, "second child data source cannot be null");
48 m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;
50 m_prematureMergeLeft = LeftChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
51 m_prematureMergeRight = RightChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
53 if ((LeftChild.OrdinalIndexState == OrdinalIndexState.Indexible)
54 && (RightChild.OrdinalIndexState == OrdinalIndexState.Indexible))
56 SetOrdinalIndex(OrdinalIndexState.Indexible);
61 ExchangeUtilities.Worse(OrdinalIndexState.Increasing,
62 ExchangeUtilities.Worse(LeftChild.OrdinalIndexState, RightChild.OrdinalIndexState)));
66 //---------------------------------------------------------------------------------------
67 // Just opens the current operator, including opening the child and wrapping it with
68 // partitions as needed.
71 internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
73 // We just open the children operators.
74 QueryResults<TSource> leftChildResults = LeftChild.Open(settings, preferStriping);
75 QueryResults<TSource> rightChildResults = RightChild.Open(settings, preferStriping);
77 return ConcatQueryOperatorResults.NewResults(leftChildResults, rightChildResults, this, settings, preferStriping);
80 public override void WrapPartitionedStream<TLeftKey, TRightKey>(
81 PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream,
82 IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings)
84 // Prematurely merge the left results, if necessary
85 if (m_prematureMergeLeft)
87 ListQueryResults<TSource> leftStreamResults =
88 ExecuteAndCollectResults(leftStream, leftStream.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
89 PartitionedStream<TSource, int> leftStreamInc = leftStreamResults.GetPartitionedStream();
90 WrapHelper<int, TRightKey>(leftStreamInc, rightStream, outputRecipient, settings, preferStriping);
94 Contract.Assert(!ExchangeUtilities.IsWorseThan(leftStream.OrdinalIndexState, OrdinalIndexState.Increasing));
95 WrapHelper<TLeftKey, TRightKey>(leftStream, rightStream, outputRecipient, settings, preferStriping);
99 private void WrapHelper<TLeftKey,TRightKey>(
100 PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream,
101 IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping)
103 // Prematurely merge the right results, if necessary
104 if (m_prematureMergeRight)
106 ListQueryResults<TSource> rightStreamResults =
107 ExecuteAndCollectResults(rightStream, leftStreamInc.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
108 PartitionedStream<TSource, int> rightStreamInc = rightStreamResults.GetPartitionedStream();
109 WrapHelper2<TLeftKey, int>(leftStreamInc, rightStreamInc, outputRecipient);
113 Contract.Assert(!ExchangeUtilities.IsWorseThan(rightStream.OrdinalIndexState, OrdinalIndexState.Increasing));
114 WrapHelper2<TLeftKey, TRightKey>(leftStreamInc, rightStream, outputRecipient);
118 private void WrapHelper2<TLeftKey, TRightKey>(
119 PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc,
120 IPartitionedStreamRecipient<TSource> outputRecipient)
122 int partitionCount = leftStreamInc.PartitionCount;
124 // Generate the shared data.
125 IComparer<ConcatKey<TLeftKey, TRightKey>> comparer = ConcatKey<TLeftKey, TRightKey>.MakeComparer(
126 leftStreamInc.KeyComparer, rightStreamInc.KeyComparer);
127 var outputStream = new PartitionedStream<TSource, ConcatKey<TLeftKey, TRightKey>>(partitionCount, comparer, OrdinalIndexState);
129 for (int i = 0; i < partitionCount; i++)
131 outputStream[i] = new ConcatQueryOperatorEnumerator<TLeftKey, TRightKey>(leftStreamInc[i], rightStreamInc[i]);
134 outputRecipient.Receive(outputStream);
138 //---------------------------------------------------------------------------------------
139 // Returns an enumerable that represents the query executing sequentially.
142 internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
144 return LeftChild.AsSequentialQuery(token).Concat(RightChild.AsSequentialQuery(token));
148 //---------------------------------------------------------------------------------------
149 // Whether this operator performs a premature merge that would not be performed in
150 // a similar sequential operation (i.e., in LINQ to Objects).
153 internal override bool LimitsParallelism
155 get { return false; }
158 //---------------------------------------------------------------------------------------
159 // The enumerator type responsible for concatenating two data sources.
162 class ConcatQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TSource, ConcatKey<TLeftKey, TRightKey>>
165 private QueryOperatorEnumerator<TSource, TLeftKey> m_firstSource; // The first data source to enumerate.
166 private QueryOperatorEnumerator<TSource, TRightKey> m_secondSource; // The second data source to enumerate.
167 private bool m_begunSecond; // Whether this partition has begun enumerating the second source yet.
169 //---------------------------------------------------------------------------------------
170 // Instantiates a new select enumerator.
173 internal ConcatQueryOperatorEnumerator(
174 QueryOperatorEnumerator<TSource, TLeftKey> firstSource,
175 QueryOperatorEnumerator<TSource, TRightKey> secondSource)
177 Contract.Assert(firstSource != null);
178 Contract.Assert(secondSource != null);
180 m_firstSource = firstSource;
181 m_secondSource = secondSource;
184 //---------------------------------------------------------------------------------------
185 // MoveNext advances to the next element in the output. While the first data source has
186 // elements, this consists of just advancing through it. After this, all partitions must
187 // synchronize at a barrier and publish the maximum index N. Finally, all partitions can
188 // move on to the second data source, adding N+1 to indices in order to get the correct
192 internal override bool MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)
194 Contract.Assert(m_firstSource != null);
195 Contract.Assert(m_secondSource != null);
197 // If we are still enumerating the first source, fetch the next item.
200 // If elements remain, just return true and continue enumerating the left.
201 TLeftKey leftKey = default(TLeftKey);
202 if (m_firstSource.MoveNext(ref currentElement, ref leftKey))
204 currentKey = ConcatKey<TLeftKey, TRightKey>.MakeLeft(leftKey);
207 m_begunSecond = true;
210 // Now either move on to, or continue, enumerating the right data source.
211 TRightKey rightKey = default(TRightKey);
212 if (m_secondSource.MoveNext(ref currentElement, ref rightKey))
214 currentKey = ConcatKey<TLeftKey, TRightKey>.MakeRight(rightKey);
221 protected override void Dispose(bool disposing)
223 m_firstSource.Dispose();
224 m_secondSource.Dispose();
229 //-----------------------------------------------------------------------------------
230 // Query results for a Concat operator. The results are indexible if the child
231 // results were indexible.
234 class ConcatQueryOperatorResults : BinaryQueryOperatorResults
236 ConcatQueryOperator<TSource> m_concatOp; // Operator that generated the results
237 int m_leftChildCount; // The number of elements in the left child result set
238 int m_rightChildCount; // The number of elements in the right child result set
240 public static QueryResults<TSource> NewResults(
241 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
242 ConcatQueryOperator<TSource> op, QuerySettings settings,
245 if (leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible)
247 return new ConcatQueryOperatorResults(
248 leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
252 return new BinaryQueryOperatorResults(
253 leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
257 private ConcatQueryOperatorResults(
258 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
259 ConcatQueryOperator<TSource> concatOp, QuerySettings settings,
261 : base(leftChildQueryResults, rightChildQueryResults, concatOp, settings, preferStriping)
263 m_concatOp = concatOp;
264 Contract.Assert(leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible);
266 m_leftChildCount = leftChildQueryResults.ElementsCount;
267 m_rightChildCount = rightChildQueryResults.ElementsCount;
270 internal override bool IsIndexible
275 internal override int ElementsCount
279 Contract.Assert(m_leftChildCount >= 0 && m_rightChildCount >= 0);
280 return m_leftChildCount + m_rightChildCount;
284 internal override TSource GetElement(int index)
286 if (index < m_leftChildCount)
288 return m_leftChildQueryResults.GetElement(index);
292 return m_rightChildQueryResults.GetElement(index - m_leftChildCount);
299 //---------------------------------------------------------------------------------------
300 // ConcatKey represents an ordering key for the Concat operator. It knows whether the
301 // element it is associated with is from the left source or the right source, and also
302 // the elements ordering key.
305 internal struct ConcatKey<TLeftKey, TRightKey>
307 private readonly TLeftKey m_leftKey;
308 private readonly TRightKey m_rightKey;
309 private readonly bool m_isLeft;
311 private ConcatKey(TLeftKey leftKey, TRightKey rightKey, bool isLeft)
314 m_rightKey = rightKey;
318 internal static ConcatKey<TLeftKey, TRightKey> MakeLeft(TLeftKey leftKey)
320 return new ConcatKey<TLeftKey, TRightKey>(leftKey, default(TRightKey), true);
323 internal static ConcatKey<TLeftKey, TRightKey> MakeRight(TRightKey rightKey)
325 return new ConcatKey<TLeftKey, TRightKey>(default(TLeftKey), rightKey, false);
328 internal static IComparer<ConcatKey<TLeftKey, TRightKey>> MakeComparer(
329 IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
331 return new ConcatKeyComparer(leftComparer, rightComparer);
334 //---------------------------------------------------------------------------------------
335 // ConcatKeyComparer compares ConcatKeys, so that elements from the left source come
336 // before elements from the right source, and elements within each source are ordered
337 // according to the corresponding order key.
340 private class ConcatKeyComparer : IComparer<ConcatKey<TLeftKey, TRightKey>>
342 private IComparer<TLeftKey> m_leftComparer;
343 private IComparer<TRightKey> m_rightComparer;
345 internal ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
347 m_leftComparer = leftComparer;
348 m_rightComparer = rightComparer;
351 public int Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y)
353 // If one element is from the left source and the other not, the element from the left source
355 if (x.m_isLeft != y.m_isLeft)
357 return x.m_isLeft ? -1 : 1;
360 // Elements are from the same source (left or right). Compare the corresponding keys.
363 return m_leftComparer.Compare(x.m_leftKey, y.m_leftKey);
365 return m_rightComparer.Compare(x.m_rightKey, y.m_rightKey);