e09d4d92b33702025cc03dfa1784a38d29783d77
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Binary / HashJoinQueryOperatorEnumerator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // HashJoinQueryOperatorEnumerator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// This enumerator implements the hash-join algorithm as noted earlier.
22     ///
23     /// Assumptions:
24     ///     This enumerator type won't work properly at all if the analysis engine didn't
25     ///     ensure a proper hash-partition. We expect inner and outer elements with equal
26     ///     keys are ALWAYS in the same partition. If they aren't (e.g. if the analysis is
27     ///     busted) we'll silently drop items on the floor. :( 
28     ///     
29     ///     
30     ///  This is the enumerator class for two operators:
31     ///   - Join
32     ///   - GroupJoin
33     /// </summary>
34     /// <typeparam name="TLeftInput"></typeparam>
35     /// <typeparam name="TLeftKey"></typeparam>
36     /// <typeparam name="TRightInput"></typeparam>
37     /// <typeparam name="THashKey"></typeparam>
38     /// <typeparam name="TOutput"></typeparam>
39     internal class HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, THashKey, TOutput> 
40         : QueryOperatorEnumerator<TOutput,TLeftKey>
41     {
42         private readonly QueryOperatorEnumerator<Pair<TLeftInput, THashKey>, TLeftKey> m_leftSource; // Left (outer) data source. For probing.
43         private readonly QueryOperatorEnumerator<Pair<TRightInput, THashKey>, int> m_rightSource; // Right (inner) data source. For building.
44         private readonly Func<TLeftInput, TRightInput, TOutput> m_singleResultSelector; // Single result selector.
45         private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> m_groupResultSelector; // Group result selector.
46         private readonly IEqualityComparer<THashKey> m_keyComparer; // An optional key comparison object.
47         private readonly CancellationToken m_cancellationToken;
48         private Mutables m_mutables;
49
50         private class Mutables
51         {
52             internal TLeftInput m_currentLeft; // The current matching left element.
53             internal TLeftKey m_currentLeftKey; // The current index of the matching left element.
54             internal HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>> m_rightHashLookup; // The hash lookup.
55             internal ListChunk<TRightInput> m_currentRightMatches; // Current right matches (if any).
56             internal int m_currentRightMatchesIndex; // Current index in the set of right matches.
57             internal int m_outputLoopCount;
58         }
59
60         //---------------------------------------------------------------------------------------
61         // Instantiates a new hash-join enumerator.
62         //
63
64         internal HashJoinQueryOperatorEnumerator(
65             QueryOperatorEnumerator<Pair<TLeftInput, THashKey>, TLeftKey> leftSource,
66             QueryOperatorEnumerator<Pair<TRightInput, THashKey>, int> rightSource,
67             Func<TLeftInput, TRightInput, TOutput> singleResultSelector,
68             Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector,
69             IEqualityComparer<THashKey> keyComparer,
70             CancellationToken cancellationToken)
71         {
72             Contract.Assert(leftSource != null);
73             Contract.Assert(rightSource != null);
74             Contract.Assert(singleResultSelector != null || groupResultSelector != null);
75
76             m_leftSource = leftSource;
77             m_rightSource = rightSource;
78             m_singleResultSelector = singleResultSelector;
79             m_groupResultSelector = groupResultSelector;
80             m_keyComparer = keyComparer;
81             m_cancellationToken = cancellationToken;
82         }
83
84         //---------------------------------------------------------------------------------------
85         // MoveNext implements all the hash-join logic noted earlier. When it is called first, it
86         // will execute the entire inner query tree, and build a hash-table lookup. This is the
87         // Building phase. Then for the first call and all subsequent calls to MoveNext, we will
88         // incrementally perform the Probing phase. We'll keep getting elements from the outer
89         // data source, looking into the hash-table we built, and enumerating the full results.
90         //
91         // This routine supports both inner and outer (group) joins. An outer join will yield a
92         // (possibly empty) list of matching elements from the inner instead of one-at-a-time,
93         // as we do for inner joins.
94         //
95
96         internal override bool MoveNext(ref TOutput currentElement, ref TLeftKey currentKey)
97         {
98             Contract.Assert(m_singleResultSelector != null || m_groupResultSelector != null, "expected a compiled result selector");
99             Contract.Assert(m_leftSource != null);
100             Contract.Assert(m_rightSource != null);
101
102             // BUILD phase: If we haven't built the hash-table yet, create that first.
103             Mutables mutables = m_mutables;
104             if (mutables == null)
105             {
106                 mutables = m_mutables = new Mutables();
107 #if DEBUG
108                 int hashLookupCount = 0;
109                 int hashKeyCollisions = 0;
110 #endif
111                 mutables.m_rightHashLookup = new HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>>(m_keyComparer);
112
113                 Pair<TRightInput, THashKey> rightPair = default(Pair<TRightInput, THashKey>);
114                 int rightKeyUnused = default(int);
115                 int i = 0;
116                 while (m_rightSource.MoveNext(ref rightPair, ref rightKeyUnused))
117                 {
118                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
119                         CancellationState.ThrowIfCanceled(m_cancellationToken);
120
121                     TRightInput rightElement = rightPair.First;
122                     THashKey rightHashKey = rightPair.Second;
123
124                     // We ignore null keys.
125                     if (rightHashKey != null)
126                     {
127 #if DEBUG
128                         hashLookupCount++;
129 #endif
130
131                         // See if we've already stored an element under the current key. If not, we
132                         // lazily allocate a pair to hold the elements mapping to the same key.
133                         const int INITIAL_CHUNK_SIZE = 2;
134                         Pair<TRightInput, ListChunk<TRightInput>> currentValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
135                         if (!mutables.m_rightHashLookup.TryGetValue(rightHashKey, ref currentValue))
136                         {
137                             currentValue = new Pair<TRightInput, ListChunk<TRightInput>>(rightElement, null);
138
139                             if (m_groupResultSelector != null)
140                             {
141                                 // For group joins, we also add the element to the list. This makes
142                                 // it easier later to yield the list as-is.
143                                 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
144                                 currentValue.Second.Add(rightElement);
145                             }
146
147                             mutables.m_rightHashLookup.Add(rightHashKey, currentValue);
148                         }
149                         else
150                         {
151                             if (currentValue.Second == null)
152                             {
153                                 // Lazily allocate a list to hold all but the 1st value. We need to
154                                 // re-store this element because the pair is a value type.
155                                 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
156                                 mutables.m_rightHashLookup[rightHashKey] = currentValue;
157                             }
158
159                             currentValue.Second.Add(rightElement);
160 #if DEBUG
161                             hashKeyCollisions++;
162 #endif
163                         }
164                     }
165                 }
166
167 #if DEBUG
168                 TraceHelpers.TraceInfo("ParallelJoinQueryOperator::MoveNext - built hash table [count = {0}, collisions = {1}]",
169                     hashLookupCount, hashKeyCollisions);
170 #endif
171             }
172
173             // PROBE phase: So long as the source has a next element, return the match.
174             ListChunk<TRightInput> currentRightChunk = mutables.m_currentRightMatches;
175             if (currentRightChunk != null && mutables.m_currentRightMatchesIndex == currentRightChunk.Count)
176             {
177                 currentRightChunk = mutables.m_currentRightMatches = currentRightChunk.Next;
178                 mutables.m_currentRightMatchesIndex = 0;
179             }
180
181             if (mutables.m_currentRightMatches == null)
182             {
183                 // We have to look up the next list of matches in the hash-table.
184                 Pair<TLeftInput, THashKey> leftPair = default(Pair<TLeftInput, THashKey>);
185                 TLeftKey leftKey = default(TLeftKey);
186                 while (m_leftSource.MoveNext(ref leftPair, ref leftKey))
187                 {
188                     if ((mutables.m_outputLoopCount++ & CancellationState.POLL_INTERVAL) == 0)
189                         CancellationState.ThrowIfCanceled(m_cancellationToken);
190
191                     // Find the match in the hash table.
192                     Pair<TRightInput, ListChunk<TRightInput>> matchValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
193                     TLeftInput leftElement = leftPair.First;
194                     THashKey leftHashKey = leftPair.Second;
195
196                     // Ignore null keys.
197                     if (leftHashKey != null)
198                     {
199                         if (mutables.m_rightHashLookup.TryGetValue(leftHashKey, ref matchValue))
200                         {
201                             // We found a new match. For inner joins, we remember the list in case
202                             // there are multiple value under this same key -- the next iteration will pick
203                             // them up. For outer joins, we will use the list momentarily.
204                             if (m_singleResultSelector != null)
205                             {
206                                 mutables.m_currentRightMatches = matchValue.Second;
207                                 Contract.Assert(mutables.m_currentRightMatches == null || mutables.m_currentRightMatches.Count > 0,
208                                                 "we were expecting that the list would be either null or empty");
209                                 mutables.m_currentRightMatchesIndex = 0;
210
211                                 // Yield the value.
212                                 currentElement = m_singleResultSelector(leftElement, matchValue.First);
213                                 currentKey = leftKey;
214
215                                 // If there is a list of matches, remember the left values for next time.
216                                 if (matchValue.Second != null)
217                                 {
218                                     mutables.m_currentLeft = leftElement;
219                                     mutables.m_currentLeftKey = leftKey;
220                                 }
221
222                                 return true;
223                             }
224                         }
225                     }
226
227                     // For outer joins, we always yield a result.
228                     if (m_groupResultSelector != null)
229                     {
230                         // Grab the matches, or create an empty list if there are none.
231                         IEnumerable<TRightInput> matches = matchValue.Second;
232                         if (matches == null)
233                         {
234                             matches = ParallelEnumerable.Empty<TRightInput>();
235                         }
236
237                         // Generate the current value.
238                         currentElement = m_groupResultSelector(leftElement, matches);
239                         currentKey = leftKey;
240                         return true;
241                     }
242                 }
243
244                 // If we've reached the end of the data source, we're done.
245                 return false;
246             }
247
248             // Produce the next element and increment our index within the matches.
249             Contract.Assert(m_singleResultSelector != null);
250             Contract.Assert(mutables.m_currentRightMatches != null);
251             Contract.Assert(0 <= mutables.m_currentRightMatchesIndex && mutables.m_currentRightMatchesIndex < mutables.m_currentRightMatches.Count);
252
253             currentElement = m_singleResultSelector(
254                 mutables.m_currentLeft, mutables.m_currentRightMatches.m_chunk[mutables.m_currentRightMatchesIndex]);
255             currentKey = mutables.m_currentLeftKey;
256
257             mutables.m_currentRightMatchesIndex++;
258
259             return true;
260         }
261
262         protected override void Dispose(bool disposing)
263         {
264             Contract.Assert(m_leftSource != null && m_rightSource != null);
265             m_leftSource.Dispose();
266             m_rightSource.Dispose();
267         }
268     }
269
270 }