[System.Core] Fixes warnings
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Binary / UnionQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // UnionQueryOperator.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Operator that yields the union of two data sources. 
22     /// </summary>
23     /// <typeparam name="TInputOutput"></typeparam>
24     internal sealed class UnionQueryOperator<TInputOutput> :
25         BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
26     {
27
28         private readonly IEqualityComparer<TInputOutput> m_comparer; // An equality comparer.
29
30         //---------------------------------------------------------------------------------------
31         // Constructs a new union operator.
32         //
33
34         internal UnionQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
35             :base(left, right)
36         {
37             Contract.Assert(left != null && right != null, "child data sources cannot be null");
38
39             m_comparer = comparer;
40             m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;
41         }
42
43         //---------------------------------------------------------------------------------------
44         // Just opens the current operator, including opening the child and wrapping it with
45         // partitions as needed.
46         //
47
48         internal override QueryResults<TInputOutput> Open(
49             QuerySettings settings, bool preferStriping)
50         {
51             // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but 
52             // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
53             // partitioning, the output will be hash-partititioned.
54             QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
55             QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
56
57             return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
58         }
59
60         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
61             PartitionedStream<TInputOutput, TLeftKey> leftStream, PartitionedStream<TInputOutput, TRightKey> rightStream, 
62             IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
63         {
64             Contract.Assert(leftStream.PartitionCount == rightStream.PartitionCount);
65             int partitionCount = leftStream.PartitionCount;
66
67             // Wrap both child streams with hash repartition
68
69             if (LeftChild.OutputOrdered)
70             {
71                 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream =
72                     ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
73                         leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken);
74
75                 WrapPartitionedStreamFixedLeftType<TLeftKey, TRightKey>(
76                     leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
77             }
78             else
79             {
80                 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> leftHashStream =
81                     ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
82                         leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken);
83
84                 WrapPartitionedStreamFixedLeftType<int, TRightKey>(
85                     leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
86             }
87         }
88
89         //---------------------------------------------------------------------------------------
90         // A helper method that allows WrapPartitionedStream to fix the TLeftKey type parameter.
91         //
92
93         private void WrapPartitionedStreamFixedLeftType<TLeftKey, TRightKey>(
94             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightStream,
95             IPartitionedStreamRecipient<TInputOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken)
96         {
97             if (RightChild.OutputOrdered)
98             {
99                 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightHashStream =
100                     ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
101                         rightStream, null, null, m_comparer, cancellationToken);
102
103                 WrapPartitionedStreamFixedBothTypes<TLeftKey, TRightKey>(
104                     leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken);
105             }
106             else
107             {
108                 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
109                     ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
110                         rightStream, null, null, m_comparer, cancellationToken);
111
112                 WrapPartitionedStreamFixedBothTypes<TLeftKey, int>(
113                     leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken);
114             }
115         }
116
117         //---------------------------------------------------------------------------------------
118         // A helper method that allows WrapPartitionedStreamHelper to fix the TRightKey type parameter.
119         //
120
121         private void WrapPartitionedStreamFixedBothTypes<TLeftKey, TRightKey>(
122             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream,
123             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightHashStream,
124             IPartitionedStreamRecipient<TInputOutput> outputRecipient, int partitionCount,
125             CancellationToken cancellationToken)
126         {
127             if (LeftChild.OutputOrdered || RightChild.OutputOrdered)
128             {
129                 IComparer<ConcatKey<TLeftKey, TRightKey>> compoundKeyComparer =
130                     ConcatKey<TLeftKey, TRightKey>.MakeComparer(leftHashStream.KeyComparer, rightHashStream.KeyComparer);
131
132                 PartitionedStream<TInputOutput, ConcatKey<TLeftKey, TRightKey>> outputStream =
133                     new PartitionedStream<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(partitionCount, compoundKeyComparer, OrdinalIndexState.Shuffled);
134
135                 for (int i = 0; i < partitionCount; i++)
136                 {
137                     outputStream[i] = new OrderedUnionQueryOperatorEnumerator<TLeftKey, TRightKey>(
138                         leftHashStream[i], rightHashStream[i], LeftChild.OutputOrdered, RightChild.OutputOrdered,
139                         m_comparer, compoundKeyComparer, cancellationToken);
140                 }
141
142                 outputRecipient.Receive(outputStream);
143             }
144             else
145             {
146                 PartitionedStream<TInputOutput, int> outputStream =
147                     new PartitionedStream<TInputOutput, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
148
149                 for (int i = 0; i < partitionCount; i++)
150                 {
151                     outputStream[i] = new UnionQueryOperatorEnumerator<TLeftKey, TRightKey>(
152                         leftHashStream[i], rightHashStream[i], i, m_comparer, cancellationToken);
153                 }
154
155                 outputRecipient.Receive(outputStream);
156             }
157         }
158
159
160         //---------------------------------------------------------------------------------------
161         // Returns an enumerable that represents the query executing sequentially.
162         //
163
164         internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
165         {
166             IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
167             IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
168             return wrappedLeftChild.Union(wrappedRightChild, m_comparer);
169         }
170
171         //---------------------------------------------------------------------------------------
172         // Whether this operator performs a premature merge that would not be performed in
173         // a similar sequential operation (i.e., in LINQ to Objects).
174         //
175
176         internal override bool LimitsParallelism
177         {
178             get { return false; }
179         }
180
181         //---------------------------------------------------------------------------------------
182         // This enumerator performs the union operation incrementally. It does this by maintaining
183         // a history -- in the form of a set -- of all data already seen. It is careful not to
184         // return any duplicates.
185         //
186
187         class UnionQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TInputOutput, int>
188         {
189
190             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
191             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> m_rightSource; // Right data source.
192 #if !MONO
193             private readonly int m_partitionIndex; // The current partition.
194 #endif
195             private Set<TInputOutput> m_hashLookup; // The hash lookup, used to produce the union.
196             private CancellationToken m_cancellationToken;
197             private Shared<int> m_outputLoopCount;
198             private readonly IEqualityComparer<TInputOutput> m_comparer;
199
200             //---------------------------------------------------------------------------------------
201             // Instantiates a new union operator.
202             //
203
204             internal UnionQueryOperatorEnumerator(
205                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
206                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightSource,
207                 int partitionIndex, IEqualityComparer<TInputOutput> comparer,
208                 CancellationToken cancellationToken)
209             {
210                 Contract.Assert(leftSource != null);
211                 Contract.Assert(rightSource != null);
212
213                 m_leftSource = leftSource;
214                 m_rightSource = rightSource;
215 #if !MONO
216                 m_partitionIndex = partitionIndex;
217 #endif
218                 m_comparer = comparer;
219                 m_cancellationToken = cancellationToken;
220             }
221
222             //---------------------------------------------------------------------------------------
223             // Walks the two data sources, left and then right, to produce the union.
224             //
225
226             internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
227             {
228                 if (m_hashLookup == null)
229                 {
230                     m_hashLookup = new Set<TInputOutput>(m_comparer);
231                     m_outputLoopCount = new Shared<int>(0);
232                 }
233
234                 Contract.Assert(m_hashLookup != null);
235
236                 // Enumerate the left and then right data source. When each is done, we set the
237                 // field to null so we will skip it upon subsequent calls to MoveNext.
238                 if (m_leftSource != null)
239                 {
240                     // Iterate over this set's elements until we find a unique element.
241                     TLeftKey keyUnused = default(TLeftKey);
242                     Pair<TInputOutput, NoKeyMemoizationRequired> currentLeftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
243
244                     int i = 0;
245                     while (m_leftSource.MoveNext(ref currentLeftElement, ref keyUnused))
246                     {
247                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
248                             CancellationState.ThrowIfCanceled(m_cancellationToken);
249
250                         // We ensure we never return duplicates by tracking them in our set.
251                         if (m_hashLookup.Add(currentLeftElement.First))
252                         {
253 #if DEBUG
254                             currentKey = unchecked((int)0xdeadbeef);
255 #endif
256                             currentElement = currentLeftElement.First;
257                             return true;
258                         }
259                     }
260
261                     m_leftSource.Dispose();
262                     m_leftSource = null;
263                 }
264                 
265                 
266                 if (m_rightSource != null)
267                 {
268                     // Iterate over this set's elements until we find a unique element.
269                     TRightKey keyUnused = default(TRightKey);
270                     Pair<TInputOutput, NoKeyMemoizationRequired> currentRightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
271
272                     while (m_rightSource.MoveNext(ref currentRightElement, ref keyUnused))
273                     {
274                         if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
275                             CancellationState.ThrowIfCanceled(m_cancellationToken);
276
277                         // We ensure we never return duplicates by tracking them in our set.
278                         if (m_hashLookup.Add(currentRightElement.First))
279                         {
280 #if DEBUG
281                             currentKey = unchecked((int)0xdeadbeef);
282 #endif
283                             currentElement = currentRightElement.First;
284                             return true;
285                         }
286                     }
287
288                     m_rightSource.Dispose();
289                     m_rightSource = null;
290                 }
291
292                 return false;
293             }
294
295             protected override void Dispose(bool disposing)
296             {
297                 if (m_leftSource != null)
298                 {
299                     m_leftSource.Dispose();
300                 }
301                 if (m_rightSource != null)
302                 {
303                     m_rightSource.Dispose();
304                 }
305             }
306         }
307
308         class OrderedUnionQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TInputOutput, ConcatKey<TLeftKey, TRightKey>>
309         {
310             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> m_leftSource; // Left data source.
311             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> m_rightSource; // Right data source.
312             private IComparer<ConcatKey<TLeftKey, TRightKey>> m_keyComparer; // Comparer for compound order keys.
313             private IEnumerator<KeyValuePair<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>>> m_outputEnumerator; // Enumerator over the output of the union.
314             private bool m_leftOrdered; // Whether the left data source is ordered.
315             private bool m_rightOrdered; // Whether the right data source is ordered.
316             private IEqualityComparer<TInputOutput> m_comparer; // Comparer for the elements.
317             private CancellationToken m_cancellationToken;
318
319             //---------------------------------------------------------------------------------------
320             // Instantiates a new union operator.
321             //
322
323             internal OrderedUnionQueryOperatorEnumerator(
324                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
325                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TRightKey> rightSource,
326                 bool leftOrdered, bool rightOrdered, IEqualityComparer<TInputOutput> comparer, IComparer<ConcatKey<TLeftKey, TRightKey>> keyComparer,
327                 CancellationToken cancellationToken)
328             {
329                 Contract.Assert(leftSource != null);
330                 Contract.Assert(rightSource != null);
331
332                 m_leftSource = leftSource;
333                 m_rightSource = rightSource;
334                 m_keyComparer = keyComparer;
335
336                 m_leftOrdered = leftOrdered;
337                 m_rightOrdered = rightOrdered;
338                 m_comparer = comparer;
339
340                 if (m_comparer == null)
341                 {
342                     m_comparer = EqualityComparer<TInputOutput>.Default;
343                 }
344
345                 m_cancellationToken = cancellationToken;
346             }
347
348             //---------------------------------------------------------------------------------------
349             // Walks the two data sources, left and then right, to produce the union.
350             //
351
352             internal override bool MoveNext(ref TInputOutput currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)
353             {
354                 Contract.Assert(m_leftSource != null);
355                 Contract.Assert(m_rightSource != null);
356
357                 if (m_outputEnumerator == null)
358                 {
359                     IEqualityComparer<Wrapper<TInputOutput>> wrapperComparer = new WrapperEqualityComparer<TInputOutput>(m_comparer);
360                     Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>> union =
361                         new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>>(wrapperComparer);
362
363                     Pair<TInputOutput, NoKeyMemoizationRequired> elem = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
364                     TLeftKey leftKey = default(TLeftKey);
365
366                     int i = 0;
367                     while (m_leftSource.MoveNext(ref elem, ref leftKey))
368                     {
369                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
370                             CancellationState.ThrowIfCanceled(m_cancellationToken);
371
372                         ConcatKey<TLeftKey, TRightKey> key =
373                             ConcatKey<TLeftKey, TRightKey>.MakeLeft(m_leftOrdered ? leftKey : default(TLeftKey));
374                         Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> oldEntry;
375                         Wrapper<TInputOutput> wrappedElem = new Wrapper<TInputOutput>(elem.First);
376
377                         if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0)
378                         {
379                             union[wrappedElem] = new Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(elem.First, key);
380                         }
381                     }
382
383                     TRightKey rightKey = default(TRightKey);
384                     while (m_rightSource.MoveNext(ref elem, ref rightKey))
385                     {
386                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
387                             CancellationState.ThrowIfCanceled(m_cancellationToken);
388
389                         ConcatKey<TLeftKey, TRightKey> key =
390                             ConcatKey<TLeftKey, TRightKey>.MakeRight(m_rightOrdered ? rightKey : default(TRightKey));
391                         Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> oldEntry;
392                         Wrapper<TInputOutput> wrappedElem = new Wrapper<TInputOutput>(elem.First);
393
394                         if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0)
395                         {
396                             union[wrappedElem] = new Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>>(elem.First, key); ;
397                         }
398                     }
399
400                     m_outputEnumerator = union.GetEnumerator();
401                 }
402
403                 if (m_outputEnumerator.MoveNext())
404                 {
405                     Pair<TInputOutput, ConcatKey<TLeftKey, TRightKey>> current = m_outputEnumerator.Current.Value;
406                     currentElement = current.First;
407                     currentKey = current.Second;
408                     return true;
409                 }
410
411                 return false;
412             }
413
414             protected override void Dispose(bool disposing)
415             {
416                 Contract.Assert(m_leftSource != null && m_rightSource != null);
417                 m_leftSource.Dispose();
418                 m_rightSource.Dispose();
419             }
420         }
421     }
422 }