3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // UnionQueryOperator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// Operator that yields the union of two data sources.
23 /// <typeparam name="TInputOutput"></typeparam>
24 internal sealed class UnionQueryOperator<TInputOutput> :
25 BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
28 private readonly IEqualityComparer<TInputOutput> m_comparer; // An equality comparer.
30 //---------------------------------------------------------------------------------------
31 // Constructs a new union operator.
34 internal UnionQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
37 Contract.Assert(left != null && right != null, "child data sources cannot be null");
39 m_comparer = comparer;
40 m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;
43 //---------------------------------------------------------------------------------------
44 // Just opens the current operator, including opening the child and wrapping it with
45 // partitions as needed.
48 internal override QueryResults<TInputOutput> Open(
49 QuerySettings settings, bool preferStriping)
51 // We just open our child operators, left and then right. Do not propagate the preferStriping value, but
52 // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
53 // partitioning, the output will be hash-partititioned.
54 QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
55 QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
57 return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
60 public override void WrapPartitionedStream<TLeftKey, TRightKey>(
61 PartitionedStream<TInputOutput, TLeftKey> leftStream, PartitionedStream<TInputOutput, TRightKey> rightStream,
62 IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
64 Contract.Assert(leftStream.PartitionCount == rightStream.PartitionCount);
65 int partitionCount = leftStream.PartitionCount;
67 // Wrap both child streams with hash repartition
69 if (LeftChild.OutputOrdered)
71 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream =
72 ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
73 leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken);
75 WrapPartitionedStreamFixedLeftType<TLeftKey, TRightKey>(
76 leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
80 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> leftHashStream =
81 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
82 leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken);
84 WrapPartitionedStreamFixedLeftType<int, TRightKey>(
85 leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
89 //---------------------------------------------------------------------------------------
90 // A helper method that allows WrapPartitionedStream to fix the TLeftKey type parameter.
93 private void WrapPartitionedStreamFixedLeftType<TLeftKey, TRightKey>(
94 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightStream,
95 IPartitionedStreamRecipient<TInputOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken)
97 if (RightChild.OutputOrdered)
99 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightHashStream =
100 ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
101 rightStream, null, null, m_comparer, cancellationToken);
103 WrapPartitionedStreamFixedBothTypes<TLeftKey, TRightKey>(
104 leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken);
108 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
109 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
110 rightStream, null, null, m_comparer, cancellationToken);
112 WrapPartitionedStreamFixedBothTypes<TLeftKey, int>(
113 leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken);
117 //---------------------------------------------------------------------------------------
118 // A helper method that allows WrapPartitionedStreamHelper to fix the TRightKey type parameter.
121 private void WrapPartitionedStreamFixedBothTypes<TLeftKey, TRightKey>(
122 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream,
123 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightHashStream,
124 IPartitionedStreamRecipient<TInputOutput> outputRecipient, int partitionCount,
125 CancellationToken cancellationToken)
127 if (LeftChild.OutputOrdered || RightChild.OutputOrdered)
129 IComparer<ConcatKey<TLeftKey, TRightKey>> compoundKeyComparer =
130 ConcatKey<TLeftKey, TRightKey>.MakeComparer(leftHashStream.KeyComparer, rightHashStream.KeyComparer);
132 PartitionedStream<TInputOutput, ConcatKey<TLeftKey, TRightKey>> outputStream =
133 new PartitionedStream<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(partitionCount, compoundKeyComparer, OrdinalIndexState.Shuffled);
135 for (int i = 0; i < partitionCount; i++)
137 outputStream[i] = new OrderedUnionQueryOperatorEnumerator<TLeftKey, TRightKey>(
138 leftHashStream[i], rightHashStream[i], LeftChild.OutputOrdered, RightChild.OutputOrdered,
139 m_comparer, compoundKeyComparer, cancellationToken);
142 outputRecipient.Receive(outputStream);
146 PartitionedStream<TInputOutput, int> outputStream =
147 new PartitionedStream<TInputOutput, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
149 for (int i = 0; i < partitionCount; i++)
151 outputStream[i] = new UnionQueryOperatorEnumerator<TLeftKey, TRightKey>(
152 leftHashStream[i], rightHashStream[i], i, m_comparer, cancellationToken);
155 outputRecipient.Receive(outputStream);
160 //---------------------------------------------------------------------------------------
161 // Returns an enumerable that represents the query executing sequentially.
164 internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
166 IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
167 IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
168 return wrappedLeftChild.Union(wrappedRightChild, m_comparer);
171 //---------------------------------------------------------------------------------------
172 // Whether this operator performs a premature merge that would not be performed in
173 // a similar sequential operation (i.e., in LINQ to Objects).
176 internal override bool LimitsParallelism
178 get { return false; }
181 //---------------------------------------------------------------------------------------
182 // This enumerator performs the union operation incrementally. It does this by maintaining
183 // a history -- in the form of a set -- of all data already seen. It is careful not to
184 // return any duplicates.
187 class UnionQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TInputOutput, int>
190 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
191 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> m_rightSource; // Right data source.
193 private readonly int m_partitionIndex; // The current partition.
195 private Set<TInputOutput> m_hashLookup; // The hash lookup, used to produce the union.
196 private CancellationToken m_cancellationToken;
197 private Shared<int> m_outputLoopCount;
198 private readonly IEqualityComparer<TInputOutput> m_comparer;
200 //---------------------------------------------------------------------------------------
201 // Instantiates a new union operator.
204 internal UnionQueryOperatorEnumerator(
205 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
206 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightSource,
207 int partitionIndex, IEqualityComparer<TInputOutput> comparer,
208 CancellationToken cancellationToken)
210 Contract.Assert(leftSource != null);
211 Contract.Assert(rightSource != null);
213 m_leftSource = leftSource;
214 m_rightSource = rightSource;
216 m_partitionIndex = partitionIndex;
218 m_comparer = comparer;
219 m_cancellationToken = cancellationToken;
222 //---------------------------------------------------------------------------------------
223 // Walks the two data sources, left and then right, to produce the union.
226 internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
228 if (m_hashLookup == null)
230 m_hashLookup = new Set<TInputOutput>(m_comparer);
231 m_outputLoopCount = new Shared<int>(0);
234 Contract.Assert(m_hashLookup != null);
236 // Enumerate the left and then right data source. When each is done, we set the
237 // field to null so we will skip it upon subsequent calls to MoveNext.
238 if (m_leftSource != null)
240 // Iterate over this set's elements until we find a unique element.
241 TLeftKey keyUnused = default(TLeftKey);
242 Pair<TInputOutput, NoKeyMemoizationRequired> currentLeftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
245 while (m_leftSource.MoveNext(ref currentLeftElement, ref keyUnused))
247 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
248 CancellationState.ThrowIfCanceled(m_cancellationToken);
250 // We ensure we never return duplicates by tracking them in our set.
251 if (m_hashLookup.Add(currentLeftElement.First))
254 currentKey = unchecked((int)0xdeadbeef);
256 currentElement = currentLeftElement.First;
261 m_leftSource.Dispose();
266 if (m_rightSource != null)
268 // Iterate over this set's elements until we find a unique element.
269 TRightKey keyUnused = default(TRightKey);
270 Pair<TInputOutput, NoKeyMemoizationRequired> currentRightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
272 while (m_rightSource.MoveNext(ref currentRightElement, ref keyUnused))
274 if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
275 CancellationState.ThrowIfCanceled(m_cancellationToken);
277 // We ensure we never return duplicates by tracking them in our set.
278 if (m_hashLookup.Add(currentRightElement.First))
281 currentKey = unchecked((int)0xdeadbeef);
283 currentElement = currentRightElement.First;
288 m_rightSource.Dispose();
289 m_rightSource = null;
295 protected override void Dispose(bool disposing)
297 if (m_leftSource != null)
299 m_leftSource.Dispose();
301 if (m_rightSource != null)
303 m_rightSource.Dispose();
308 class OrderedUnionQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TInputOutput, ConcatKey<TLeftKey, TRightKey>>
310 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
311 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> m_rightSource; // Right data source.
312 private IComparer<ConcatKey<TLeftKey, TRightKey>> m_keyComparer; // Comparer for compound order keys.
313 private IEnumerator<KeyValuePair<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>>> m_outputEnumerator; // Enumerator over the output of the union.
314 private bool m_leftOrdered; // Whether the left data source is ordered.
315 private bool m_rightOrdered; // Whether the right data source is ordered.
316 private IEqualityComparer<TInputOutput> m_comparer; // Comparer for the elements.
317 private CancellationToken m_cancellationToken;
319 //---------------------------------------------------------------------------------------
320 // Instantiates a new union operator.
323 internal OrderedUnionQueryOperatorEnumerator(
324 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
325 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightSource,
326 bool leftOrdered, bool rightOrdered, IEqualityComparer<TInputOutput> comparer, IComparer<ConcatKey<TLeftKey, TRightKey>> keyComparer,
327 CancellationToken cancellationToken)
329 Contract.Assert(leftSource != null);
330 Contract.Assert(rightSource != null);
332 m_leftSource = leftSource;
333 m_rightSource = rightSource;
334 m_keyComparer = keyComparer;
336 m_leftOrdered = leftOrdered;
337 m_rightOrdered = rightOrdered;
338 m_comparer = comparer;
340 if (m_comparer == null)
342 m_comparer = EqualityComparer<TInputOutput>.Default;
345 m_cancellationToken = cancellationToken;
348 //---------------------------------------------------------------------------------------
349 // Walks the two data sources, left and then right, to produce the union.
352 internal override bool MoveNext(ref TInputOutput currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)
354 Contract.Assert(m_leftSource != null);
355 Contract.Assert(m_rightSource != null);
357 if (m_outputEnumerator == null)
359 IEqualityComparer<Wrapper<TInputOutput>> wrapperComparer = new WrapperEqualityComparer<TInputOutput>(m_comparer);
360 Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>> union =
361 new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>>(wrapperComparer);
363 Pair<TInputOutput, NoKeyMemoizationRequired> elem = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
364 TLeftKey leftKey = default(TLeftKey);
367 while (m_leftSource.MoveNext(ref elem, ref leftKey))
369 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
370 CancellationState.ThrowIfCanceled(m_cancellationToken);
372 ConcatKey<TLeftKey, TRightKey> key =
373 ConcatKey<TLeftKey, TRightKey>.MakeLeft(m_leftOrdered ? leftKey : default(TLeftKey));
374 Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> oldEntry;
375 Wrapper<TInputOutput> wrappedElem = new Wrapper<TInputOutput>(elem.First);
377 if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0)
379 union[wrappedElem] = new Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(elem.First, key);
383 TRightKey rightKey = default(TRightKey);
384 while (m_rightSource.MoveNext(ref elem, ref rightKey))
386 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
387 CancellationState.ThrowIfCanceled(m_cancellationToken);
389 ConcatKey<TLeftKey, TRightKey> key =
390 ConcatKey<TLeftKey, TRightKey>.MakeRight(m_rightOrdered ? rightKey : default(TRightKey));
391 Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> oldEntry;
392 Wrapper<TInputOutput> wrappedElem = new Wrapper<TInputOutput>(elem.First);
394 if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0)
396 union[wrappedElem] = new Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(elem.First, key); ;
400 m_outputEnumerator = union.GetEnumerator();
403 if (m_outputEnumerator.MoveNext())
405 Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> current = m_outputEnumerator.Current.Value;
406 currentElement = current.First;
407 currentKey = current.Second;
414 protected override void Dispose(bool disposing)
416 Contract.Assert(m_leftSource != null && m_rightSource != null);
417 m_leftSource.Dispose();
418 m_rightSource.Dispose();