3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // IntersectQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// Operator that yields the intersection of two data sources.
23 /// <typeparam name="TInputOutput"></typeparam>
24 internal sealed class IntersectQueryOperator<TInputOutput> :
25 BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
28 private readonly IEqualityComparer<TInputOutput> m_comparer; // An equality comparer.
30 //---------------------------------------------------------------------------------------
31 // Constructs a new intersection operator.
34 internal IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
37 Contract.Assert(left != null && right != null, "child data sources cannot be null");
39 m_comparer = comparer;
40 m_outputOrdered = LeftChild.OutputOrdered;
42 SetOrdinalIndex(OrdinalIndexState.Shuffled);
46 internal override QueryResults<TInputOutput> Open(
47 QuerySettings settings, bool preferStriping)
49 // We just open our child operators, left and then right. Do not propagate the preferStriping value, but
50 // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
51 // partitioning, the output will be hash-partititioned.
52 QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
53 QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
55 return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
58 public override void WrapPartitionedStream<TLeftKey, TRightKey>(
59 PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
60 IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
62 Contract.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount);
66 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
67 ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
68 leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
69 rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
73 WrapPartitionedStreamHelper<int, TRightKey>(
74 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
75 leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
76 rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
80 //---------------------------------------------------------------------------------------
81 // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
82 // to be, and then call this method with that key as a generic parameter.
85 private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
86 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
87 IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
89 int partitionCount = leftHashStream.PartitionCount;
91 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
92 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
93 rightPartitionedStream, null, null, m_comparer, cancellationToken);
95 PartitionedStream<TInputOutput, TLeftKey> outputStream =
96 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
97 for (int i = 0; i < partitionCount; i++)
101 outputStream[i] = new OrderedIntersectQueryOperatorEnumerator<TLeftKey>(
102 leftHashStream[i], rightHashStream[i], m_comparer, leftHashStream.KeyComparer, cancellationToken);
106 outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
107 new IntersectQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], m_comparer, cancellationToken);
111 outputRecipient.Receive(outputStream);
114 //---------------------------------------------------------------------------------------
115 // Whether this operator performs a premature merge that would not be performed in
116 // a similar sequential operation (i.e., in LINQ to Objects).
119 internal override bool LimitsParallelism
121 get { return false; }
124 //---------------------------------------------------------------------------------------
125 // This enumerator performs the intersection operation incrementally. It does this by
126 // maintaining a history -- in the form of a set -- of all data already seen. It then
127 // only returns elements that are seen twice (returning each one only once).
130 class IntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
133 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
134 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> m_rightSource; // Right data source.
135 private IEqualityComparer<TInputOutput> m_comparer; // Comparer to use for equality/hash-coding.
136 private Set<TInputOutput> m_hashLookup; // The hash lookup, used to produce the intersection.
137 private CancellationToken m_cancellationToken;
138 private Shared<int> m_outputLoopCount;
140 //---------------------------------------------------------------------------------------
141 // Instantiates a new intersection operator.
144 internal IntersectQueryOperatorEnumerator(
145 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
146 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
147 IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken)
149 Contract.Assert(leftSource != null);
150 Contract.Assert(rightSource != null);
152 m_leftSource = leftSource;
153 m_rightSource = rightSource;
154 m_comparer = comparer;
155 m_cancellationToken = cancellationToken;
158 //---------------------------------------------------------------------------------------
159 // Walks the two data sources, left and then right, to produce the intersection.
162 internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
164 Contract.Assert(m_leftSource != null);
165 Contract.Assert(m_rightSource != null);
167 // Build the set out of the right data source, if we haven't already.
169 if (m_hashLookup == null)
171 m_outputLoopCount = new Shared<int>(0);
172 m_hashLookup = new Set<TInputOutput>(m_comparer);
174 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
175 int rightKeyUnused = default(int);
178 while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
180 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
181 CancellationState.ThrowIfCanceled(m_cancellationToken);
183 m_hashLookup.Add(rightElement.First);
187 // Now iterate over the left data source, looking for matches.
188 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
189 TLeftKey keyUnused = default(TLeftKey);
191 while (m_leftSource.MoveNext(ref leftElement, ref keyUnused))
193 if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
194 CancellationState.ThrowIfCanceled(m_cancellationToken);
196 // If we found the element in our set, and if we haven't returned it yet,
197 // we can yield it to the caller. We also mark it so we know we've returned
198 // it once already and never will again.
199 if (m_hashLookup.Contains(leftElement.First))
201 m_hashLookup.Remove(leftElement.First);
202 currentElement = leftElement.First;
204 currentKey = unchecked((int)0xdeadbeef);
213 protected override void Dispose(bool disposing)
215 Contract.Assert(m_leftSource != null && m_rightSource != null);
216 m_leftSource.Dispose();
217 m_rightSource.Dispose();
221 //---------------------------------------------------------------------------------------
222 // Returns an enumerable that represents the query executing sequentially.
225 internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
227 IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
228 IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
229 return wrappedLeftChild.Intersect(wrappedRightChild, m_comparer);
233 class OrderedIntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
236 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
237 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> m_rightSource; // Right data source.
238 private IEqualityComparer<Wrapper<TInputOutput>> m_comparer; // Comparer to use for equality/hash-coding.
239 private IComparer<TLeftKey> m_leftKeyComparer; // Comparer to use to determine ordering of order keys.
240 private Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput,TLeftKey>> m_hashLookup; // The hash lookup, used to produce the intersection.
241 private CancellationToken m_cancellationToken;
243 //---------------------------------------------------------------------------------------
244 // Instantiates a new intersection operator.
247 internal OrderedIntersectQueryOperatorEnumerator(
248 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
249 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
250 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer,
251 CancellationToken cancellationToken)
253 Contract.Assert(leftSource != null);
254 Contract.Assert(rightSource != null);
256 m_leftSource = leftSource;
257 m_rightSource = rightSource;
258 m_comparer = new WrapperEqualityComparer<TInputOutput>(comparer);
259 m_leftKeyComparer = leftKeyComparer;
260 m_cancellationToken = cancellationToken;
263 //---------------------------------------------------------------------------------------
264 // Walks the two data sources, left and then right, to produce the intersection.
267 internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
269 Contract.Assert(m_leftSource != null);
270 Contract.Assert(m_rightSource != null);
272 // Build the set out of the left data source, if we haven't already.
274 if (m_hashLookup == null)
276 m_hashLookup = new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(m_comparer);
278 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
279 TLeftKey leftKey = default(TLeftKey);
280 while (m_leftSource.MoveNext(ref leftElement, ref leftKey))
282 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
283 CancellationState.ThrowIfCanceled(m_cancellationToken);
285 // For each element, we track the smallest order key for that element that we saw so far
286 Pair<TInputOutput, TLeftKey> oldEntry;
287 Wrapper<TInputOutput> wrappedLeftElem = new Wrapper<TInputOutput>(leftElement.First);
289 // If this is the first occurence of this element, or the order key is lower than all keys we saw previously,
290 // update the order key for this element.
291 if (!m_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || m_leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
293 // For each "elem" value, we store the smallest key, and the element value that had that key.
294 // Note that even though two element values are "equal" according to the EqualityComparer,
295 // we still cannot choose arbitrarily which of the two to yield.
296 m_hashLookup[wrappedLeftElem] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
301 // Now iterate over the right data source, looking for matches.
302 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
303 int rightKeyUnused = default(int);
304 while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
306 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
307 CancellationState.ThrowIfCanceled(m_cancellationToken);
309 // If we found the element in our set, and if we haven't returned it yet,
310 // we can yield it to the caller. We also mark it so we know we've returned
311 // it once already and never will again.
313 Pair<TInputOutput, TLeftKey> entry;
314 Wrapper<TInputOutput> wrappedRightElem = new Wrapper<TInputOutput>(rightElement.First);
316 if (m_hashLookup.TryGetValue(wrappedRightElem, out entry))
318 currentElement = entry.First;
319 currentKey = entry.Second;
321 m_hashLookup.Remove(new Wrapper<TInputOutput>(entry.First));
329 protected override void Dispose(bool disposing)
331 Contract.Assert(m_leftSource != null && m_rightSource != null);
332 m_leftSource.Dispose();
333 m_rightSource.Dispose();