3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // HashJoinQueryOperatorEnumerator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// This enumerator implements the hash-join algorithm as noted earlier.
24 /// This enumerator type won't work properly at all if the analysis engine didn't
25 /// ensure a proper hash-partition. We expect inner and outer elements with equal
26 /// keys are ALWAYS in the same partition. If they aren't (e.g. if the analysis is
27 /// busted) we'll silently drop items on the floor. :(
30 /// This is the enumerator class for two operators:
34 /// <typeparam name="TLeftInput"></typeparam>
35 /// <typeparam name="TLeftKey"></typeparam>
36 /// <typeparam name="TRightInput"></typeparam>
37 /// <typeparam name="THashKey"></typeparam>
38 /// <typeparam name="TOutput"></typeparam>
39 internal class HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, THashKey, TOutput>
40 : QueryOperatorEnumerator<TOutput,TLeftKey>
42 private readonly QueryOperatorEnumerator<Pair<TLeftInput, THashKey>, TLeftKey> m_leftSource; // Left (outer) data source. For probing.
43 private readonly QueryOperatorEnumerator<Pair<TRightInput, THashKey>, int> m_rightSource; // Right (inner) data source. For building.
44 private readonly Func<TLeftInput, TRightInput, TOutput> m_singleResultSelector; // Single result selector.
45 private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> m_groupResultSelector; // Group result selector.
46 private readonly IEqualityComparer<THashKey> m_keyComparer; // An optional key comparison object.
47 private readonly CancellationToken m_cancellationToken;
48 private Mutables m_mutables;
50 private class Mutables
52 internal TLeftInput m_currentLeft; // The current matching left element.
53 internal TLeftKey m_currentLeftKey; // The current index of the matching left element.
54 internal HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>> m_rightHashLookup; // The hash lookup.
55 internal ListChunk<TRightInput> m_currentRightMatches; // Current right matches (if any).
56 internal int m_currentRightMatchesIndex; // Current index in the set of right matches.
57 internal int m_outputLoopCount;
60 //---------------------------------------------------------------------------------------
61 // Instantiates a new hash-join enumerator.
64 internal HashJoinQueryOperatorEnumerator(
65 QueryOperatorEnumerator<Pair<TLeftInput, THashKey>, TLeftKey> leftSource,
66 QueryOperatorEnumerator<Pair<TRightInput, THashKey>, int> rightSource,
67 Func<TLeftInput, TRightInput, TOutput> singleResultSelector,
68 Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector,
69 IEqualityComparer<THashKey> keyComparer,
70 CancellationToken cancellationToken)
72 Contract.Assert(leftSource != null);
73 Contract.Assert(rightSource != null);
74 Contract.Assert(singleResultSelector != null || groupResultSelector != null);
76 m_leftSource = leftSource;
77 m_rightSource = rightSource;
78 m_singleResultSelector = singleResultSelector;
79 m_groupResultSelector = groupResultSelector;
80 m_keyComparer = keyComparer;
81 m_cancellationToken = cancellationToken;
84 //---------------------------------------------------------------------------------------
85 // MoveNext implements all the hash-join logic noted earlier. When it is called first, it
86 // will execute the entire inner query tree, and build a hash-table lookup. This is the
87 // Building phase. Then for the first call and all subsequent calls to MoveNext, we will
88 // incrementally perform the Probing phase. We'll keep getting elements from the outer
89 // data source, looking into the hash-table we built, and enumerating the full results.
91 // This routine supports both inner and outer (group) joins. An outer join will yield a
92 // (possibly empty) list of matching elements from the inner instead of one-at-a-time,
93 // as we do for inner joins.
96 internal override bool MoveNext(ref TOutput currentElement, ref TLeftKey currentKey)
98 Contract.Assert(m_singleResultSelector != null || m_groupResultSelector != null, "expected a compiled result selector");
99 Contract.Assert(m_leftSource != null);
100 Contract.Assert(m_rightSource != null);
102 // BUILD phase: If we haven't built the hash-table yet, create that first.
103 Mutables mutables = m_mutables;
104 if (mutables == null)
106 mutables = m_mutables = new Mutables();
108 int hashLookupCount = 0;
109 int hashKeyCollisions = 0;
111 mutables.m_rightHashLookup = new HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>>(m_keyComparer);
113 Pair<TRightInput, THashKey> rightPair = default(Pair<TRightInput, THashKey>);
114 int rightKeyUnused = default(int);
116 while (m_rightSource.MoveNext(ref rightPair, ref rightKeyUnused))
118 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
119 CancellationState.ThrowIfCanceled(m_cancellationToken);
121 TRightInput rightElement = rightPair.First;
122 THashKey rightHashKey = rightPair.Second;
124 // We ignore null keys.
125 if (rightHashKey != null)
131 // See if we've already stored an element under the current key. If not, we
132 // lazily allocate a pair to hold the elements mapping to the same key.
133 const int INITIAL_CHUNK_SIZE = 2;
134 Pair<TRightInput, ListChunk<TRightInput>> currentValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
135 if (!mutables.m_rightHashLookup.TryGetValue(rightHashKey, ref currentValue))
137 currentValue = new Pair<TRightInput, ListChunk<TRightInput>>(rightElement, null);
139 if (m_groupResultSelector != null)
141 // For group joins, we also add the element to the list. This makes
142 // it easier later to yield the list as-is.
143 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
144 currentValue.Second.Add(rightElement);
147 mutables.m_rightHashLookup.Add(rightHashKey, currentValue);
151 if (currentValue.Second == null)
153 // Lazily allocate a list to hold all but the 1st value. We need to
154 // re-store this element because the pair is a value type.
155 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
156 mutables.m_rightHashLookup[rightHashKey] = currentValue;
159 currentValue.Second.Add(rightElement);
168 TraceHelpers.TraceInfo("ParallelJoinQueryOperator::MoveNext - built hash table [count = {0}, collisions = {1}]",
169 hashLookupCount, hashKeyCollisions);
173 // PROBE phase: So long as the source has a next element, return the match.
174 ListChunk<TRightInput> currentRightChunk = mutables.m_currentRightMatches;
175 if (currentRightChunk != null && mutables.m_currentRightMatchesIndex == currentRightChunk.Count)
177 currentRightChunk = mutables.m_currentRightMatches = currentRightChunk.Next;
178 mutables.m_currentRightMatchesIndex = 0;
181 if (mutables.m_currentRightMatches == null)
183 // We have to look up the next list of matches in the hash-table.
184 Pair<TLeftInput, THashKey> leftPair = default(Pair<TLeftInput, THashKey>);
185 TLeftKey leftKey = default(TLeftKey);
186 while (m_leftSource.MoveNext(ref leftPair, ref leftKey))
188 if ((mutables.m_outputLoopCount++ & CancellationState.POLL_INTERVAL) == 0)
189 CancellationState.ThrowIfCanceled(m_cancellationToken);
191 // Find the match in the hash table.
192 Pair<TRightInput, ListChunk<TRightInput>> matchValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
193 TLeftInput leftElement = leftPair.First;
194 THashKey leftHashKey = leftPair.Second;
197 if (leftHashKey != null)
199 if (mutables.m_rightHashLookup.TryGetValue(leftHashKey, ref matchValue))
201 // We found a new match. For inner joins, we remember the list in case
202 // there are multiple value under this same key -- the next iteration will pick
203 // them up. For outer joins, we will use the list momentarily.
204 if (m_singleResultSelector != null)
206 mutables.m_currentRightMatches = matchValue.Second;
207 Contract.Assert(mutables.m_currentRightMatches == null || mutables.m_currentRightMatches.Count > 0,
208 "we were expecting that the list would be either null or empty");
209 mutables.m_currentRightMatchesIndex = 0;
212 currentElement = m_singleResultSelector(leftElement, matchValue.First);
213 currentKey = leftKey;
215 // If there is a list of matches, remember the left values for next time.
216 if (matchValue.Second != null)
218 mutables.m_currentLeft = leftElement;
219 mutables.m_currentLeftKey = leftKey;
227 // For outer joins, we always yield a result.
228 if (m_groupResultSelector != null)
230 // Grab the matches, or create an empty list if there are none.
231 IEnumerable<TRightInput> matches = matchValue.Second;
234 matches = ParallelEnumerable.Empty<TRightInput>();
237 // Generate the current value.
238 currentElement = m_groupResultSelector(leftElement, matches);
239 currentKey = leftKey;
244 // If we've reached the end of the data source, we're done.
248 // Produce the next element and increment our index within the matches.
249 Contract.Assert(m_singleResultSelector != null);
250 Contract.Assert(mutables.m_currentRightMatches != null);
251 Contract.Assert(0 <= mutables.m_currentRightMatchesIndex && mutables.m_currentRightMatchesIndex < mutables.m_currentRightMatches.Count);
253 currentElement = m_singleResultSelector(
254 mutables.m_currentLeft, mutables.m_currentRightMatches.m_chunk[mutables.m_currentRightMatchesIndex]);
255 currentKey = mutables.m_currentLeftKey;
257 mutables.m_currentRightMatchesIndex++;
262 protected override void Dispose(bool disposing)
264 Contract.Assert(m_leftSource != null && m_rightSource != null);
265 m_leftSource.Dispose();
266 m_rightSource.Dispose();