Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Binary / IntersectQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // IntersectQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Operator that yields the intersection of two data sources. 
22     /// </summary>
23     /// <typeparam name="TInputOutput"></typeparam>
24     internal sealed class IntersectQueryOperator<TInputOutput> :
25         BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
26     {
27
28         private readonly IEqualityComparer<TInputOutput> m_comparer; // An equality comparer.
29
30         //---------------------------------------------------------------------------------------
31         // Constructs a new intersection operator.
32         //
33
34         internal IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
35             :base(left, right)
36         {
37             Contract.Assert(left != null && right != null, "child data sources cannot be null");
38
39             m_comparer = comparer;
40             m_outputOrdered = LeftChild.OutputOrdered;
41
42             SetOrdinalIndex(OrdinalIndexState.Shuffled);
43         }
44
45
46         internal override QueryResults<TInputOutput> Open(
47             QuerySettings settings, bool preferStriping)
48         {
49             // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but 
50             // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
51             // partitioning, the output will be hash-partititioned.
52             QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
53             QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
54
55             return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
56         }
57
58         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
59             PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
60             IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
61         {
62             Contract.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount);
63
64             if (OutputOrdered)
65             {
66                 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
67                     ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
68                         leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
69                     rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
70             }
71             else
72             {
73                 WrapPartitionedStreamHelper<int, TRightKey>(
74                     ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
75                         leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
76                     rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
77             }
78         }
79
80         //---------------------------------------------------------------------------------------
81         // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
82         // to be, and then call this method with that key as a generic parameter.
83         //
84
85         private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
86             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, 
87             IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
88         {
89             int partitionCount = leftHashStream.PartitionCount;
90
91             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
92                 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
93                     rightPartitionedStream, null, null, m_comparer, cancellationToken);
94
95             PartitionedStream<TInputOutput, TLeftKey> outputStream =
96                 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
97             for (int i = 0; i < partitionCount; i++)
98             {
99                 if (OutputOrdered)
100                 {
101                     outputStream[i] = new OrderedIntersectQueryOperatorEnumerator<TLeftKey>(
102                         leftHashStream[i], rightHashStream[i], m_comparer, leftHashStream.KeyComparer, cancellationToken);
103                 }
104                 else
105                 {
106                     outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
107                             new IntersectQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], m_comparer, cancellationToken);
108                 }
109             }
110
111             outputRecipient.Receive(outputStream);
112         }
113
114         //---------------------------------------------------------------------------------------
115         // Whether this operator performs a premature merge that would not be performed in
116         // a similar sequential operation (i.e., in LINQ to Objects).
117         //
118
119         internal override bool LimitsParallelism
120         {
121             get { return false; }
122         }
123
124         //---------------------------------------------------------------------------------------
125         // This enumerator performs the intersection operation incrementally. It does this by
126         // maintaining a history -- in the form of a set -- of all data already seen. It then
127         // only returns elements that are seen twice (returning each one only once).
128         //
129
130         class IntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
131         {
132
133             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
134             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> m_rightSource; // Right data source.
135             private IEqualityComparer<TInputOutput> m_comparer; // Comparer to use for equality/hash-coding.
136             private Set<TInputOutput> m_hashLookup; // The hash lookup, used to produce the intersection.
137             private CancellationToken m_cancellationToken;
138             private Shared<int> m_outputLoopCount;
139
140             //---------------------------------------------------------------------------------------
141             // Instantiates a new intersection operator.
142             //
143
144             internal IntersectQueryOperatorEnumerator(
145                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
146                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
147                 IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken)
148             {
149                 Contract.Assert(leftSource != null);
150                 Contract.Assert(rightSource != null);
151
152                 m_leftSource = leftSource;
153                 m_rightSource = rightSource;
154                 m_comparer = comparer;
155                 m_cancellationToken = cancellationToken;
156             }
157
158             //---------------------------------------------------------------------------------------
159             // Walks the two data sources, left and then right, to produce the intersection.
160             //
161
162             internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
163             {
164                 Contract.Assert(m_leftSource != null);
165                 Contract.Assert(m_rightSource != null);
166
167                 // Build the set out of the right data source, if we haven't already.
168                 
169                 if (m_hashLookup == null)
170                 {
171                     m_outputLoopCount = new Shared<int>(0);
172                     m_hashLookup = new Set<TInputOutput>(m_comparer);
173
174                     Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
175                     int rightKeyUnused = default(int);
176
177                     int i = 0;
178                     while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
179                     {
180                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
181                             CancellationState.ThrowIfCanceled(m_cancellationToken);
182
183                         m_hashLookup.Add(rightElement.First);
184                     }
185                 }
186
187                 // Now iterate over the left data source, looking for matches.
188                 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
189                 TLeftKey keyUnused = default(TLeftKey);
190
191                 while (m_leftSource.MoveNext(ref leftElement, ref keyUnused))
192                 {
193                     if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
194                             CancellationState.ThrowIfCanceled(m_cancellationToken);
195
196                     // If we found the element in our set, and if we haven't returned it yet,
197                     // we can yield it to the caller. We also mark it so we know we've returned
198                     // it once already and never will again.
199                     if (m_hashLookup.Contains(leftElement.First))
200                     {
201                         m_hashLookup.Remove(leftElement.First);
202                         currentElement = leftElement.First;
203 #if DEBUG
204                         currentKey = unchecked((int)0xdeadbeef);
205 #endif
206                         return true;
207                     }
208                 }
209
210                 return false;
211             }
212
213             protected override void Dispose(bool disposing)
214             {
215                 Contract.Assert(m_leftSource != null && m_rightSource != null);
216                 m_leftSource.Dispose();
217                 m_rightSource.Dispose();
218             }
219         }
220
221         //---------------------------------------------------------------------------------------
222         // Returns an enumerable that represents the query executing sequentially.
223         //
224
225         internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
226         {
227             IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
228             IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
229             return wrappedLeftChild.Intersect(wrappedRightChild, m_comparer);
230         }
231
232
233         class OrderedIntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
234         {
235
236             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
237             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> m_rightSource; // Right data source.
238             private IEqualityComparer<Wrapper<TInputOutput>> m_comparer; // Comparer to use for equality/hash-coding.
239             private IComparer<TLeftKey> m_leftKeyComparer; // Comparer to use to determine ordering of order keys.
240             private Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput,TLeftKey>> m_hashLookup; // The hash lookup, used to produce the intersection.
241             private CancellationToken m_cancellationToken;
242
243             //---------------------------------------------------------------------------------------
244             // Instantiates a new intersection operator.
245             //
246
247             internal OrderedIntersectQueryOperatorEnumerator(
248                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
249                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
250                 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer,
251                 CancellationToken cancellationToken)
252             {
253                 Contract.Assert(leftSource != null);
254                 Contract.Assert(rightSource != null);
255
256                 m_leftSource = leftSource;
257                 m_rightSource = rightSource;
258                 m_comparer = new WrapperEqualityComparer<TInputOutput>(comparer);
259                 m_leftKeyComparer = leftKeyComparer;
260                 m_cancellationToken = cancellationToken;
261             }
262
263             //---------------------------------------------------------------------------------------
264             // Walks the two data sources, left and then right, to produce the intersection.
265             //
266
267             internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
268             {
269                 Contract.Assert(m_leftSource != null);
270                 Contract.Assert(m_rightSource != null);
271
272                 // Build the set out of the left data source, if we haven't already.
273                 int i = 0;
274                 if (m_hashLookup == null)
275                 {
276                     m_hashLookup = new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(m_comparer);
277
278                     Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
279                     TLeftKey leftKey = default(TLeftKey);
280                     while (m_leftSource.MoveNext(ref leftElement, ref leftKey))
281                     {
282                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
283                             CancellationState.ThrowIfCanceled(m_cancellationToken);
284
285                         // For each element, we track the smallest order key for that element that we saw so far
286                         Pair<TInputOutput, TLeftKey> oldEntry;
287                         Wrapper<TInputOutput> wrappedLeftElem = new Wrapper<TInputOutput>(leftElement.First);
288
289                         // If this is the first occurence of this element, or the order key is lower than all keys we saw previously,
290                         // update the order key for this element.
291                         if (!m_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || m_leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
292                         {
293                             // For each "elem" value, we store the smallest key, and the element value that had that key.
294                             // Note that even though two element values are "equal" according to the EqualityComparer,
295                             // we still cannot choose arbitrarily which of the two to yield.
296                             m_hashLookup[wrappedLeftElem] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
297                         }
298                     }
299                 }
300
301                 // Now iterate over the right data source, looking for matches.
302                 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
303                 int rightKeyUnused = default(int);
304                 while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
305                 {
306                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
307                         CancellationState.ThrowIfCanceled(m_cancellationToken);
308
309                     // If we found the element in our set, and if we haven't returned it yet,
310                     // we can yield it to the caller. We also mark it so we know we've returned
311                     // it once already and never will again.
312
313                     Pair<TInputOutput, TLeftKey> entry;
314                     Wrapper<TInputOutput> wrappedRightElem = new Wrapper<TInputOutput>(rightElement.First);
315
316                     if (m_hashLookup.TryGetValue(wrappedRightElem, out entry))
317                     {
318                         currentElement = entry.First;
319                         currentKey = entry.Second;
320
321                         m_hashLookup.Remove(new Wrapper<TInputOutput>(entry.First));
322                         return true;
323                     }
324                 }
325
326                 return false;
327             }
328
329             protected override void Dispose(bool disposing)
330             {
331                 Contract.Assert(m_leftSource != null && m_rightSource != null);
332                 m_leftSource.Dispose();
333                 m_rightSource.Dispose();
334             }
335         }
336     }
337 }