Update Reference Sources to .NET Framework 4.6.1
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / GroupByQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // GroupByQueryOperator.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections;
15 using System.Collections.Generic;
16 using System.Diagnostics.Contracts;
17 using System.Threading;
18 using IEnumerator=System.Collections.IEnumerator;
19
20 namespace System.Linq.Parallel
21 {
22     /// <summary>
23     /// The operator type for GroupBy statements. This operator groups the input based on
24     /// a key-selection routine, yielding one-to-many values of key-to-elements. The
25     /// implementation is very much like the hash join operator, in which we first build
26     /// a big hashtable of the input; then we just iterate over each unique key in the
27     /// hashtable, yielding it plus all of the elements with the same key.
28     /// </summary>
29     /// <typeparam name="TSource"></typeparam>
30     /// <typeparam name="TGroupKey"></typeparam>
31     /// <typeparam name="TElement"></typeparam>
32     internal sealed class GroupByQueryOperator<TSource, TGroupKey, TElement> :
33         UnaryQueryOperator<TSource, IGrouping<TGroupKey, TElement>>
34     {
35
36         private readonly Func<TSource, TGroupKey> m_keySelector; // Key selection function.
37         private readonly Func<TSource, TElement> m_elementSelector; // Optional element selection function.
38         private readonly IEqualityComparer<TGroupKey> m_keyComparer; // An optional key comparison object.
39
40         //---------------------------------------------------------------------------------------
41         // Initializes a new group by operator.
42         //
43         // Arguments:
44         //    child                - the child operator or data source from which to pull data
45         //    keySelector          - a delegate representing the key selector function
46         //    elementSelector      - a delegate representing the element selector function
47         //    keyComparer          - an optional key comparison routine
48         //
49         // Assumptions:
50         //    keySelector must be non null.
51         //    elementSelector must be non null.
52         //
53
54         internal GroupByQueryOperator(IEnumerable<TSource> child,
55                                       Func<TSource, TGroupKey> keySelector,
56                                       Func<TSource, TElement> elementSelector,
57                                       IEqualityComparer<TGroupKey> keyComparer)
58             :base(child)
59         {
60             Contract.Assert(child != null, "child data source cannot be null");
61             Contract.Assert(keySelector != null, "need a selector function");
62             Contract.Assert(elementSelector != null ||
63                             typeof(TSource) == typeof(TElement), "need an element function if TSource!=TElement");
64
65             m_keySelector = keySelector;
66             m_elementSelector = elementSelector;
67             m_keyComparer = keyComparer;
68
69             SetOrdinalIndexState(OrdinalIndexState.Shuffled);
70         }
71
72         internal override void WrapPartitionedStream<TKey>(
73             PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<IGrouping<TGroupKey, TElement>> recipient, 
74             bool preferStriping, QuerySettings settings)
75         {
76             // Hash-repartion the source stream
77             if (Child.OutputOrdered)
78             {
79                 WrapPartitionedStreamHelperOrdered<TKey>(
80                     ExchangeUtilities.HashRepartitionOrdered<TSource, TGroupKey, TKey>(
81                         inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
82                     recipient,
83                     settings.CancellationState.MergedCancellationToken
84                 );
85             }
86             else
87             {
88                 WrapPartitionedStreamHelper<TKey, int>(
89                     ExchangeUtilities.HashRepartition<TSource, TGroupKey, TKey>(
90                         inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
91                     recipient,
92                     settings.CancellationState.MergedCancellationToken
93                 );
94             }
95         }
96
97         //---------------------------------------------------------------------------------------
98         // This is a helper method. WrapPartitionedStream decides what type TKey is going
99         // to be, and then call this method with that key as a generic parameter.
100         //
101
102         private void WrapPartitionedStreamHelper<TIgnoreKey, TKey>(
103             PartitionedStream<Pair<TSource, TGroupKey>, TKey> hashStream,
104             IPartitionedStreamRecipient<IGrouping<TGroupKey, TElement>> recipient,
105             CancellationToken cancellationToken)
106         {
107             int partitionCount = hashStream.PartitionCount;
108             PartitionedStream<IGrouping<TGroupKey, TElement>, TKey> outputStream =
109                 new PartitionedStream<IGrouping<TGroupKey, TElement>, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
110
111             // If there is no element selector, we return a special identity enumerator. Otherwise,
112             // we return one that will apply the element selection function during enumeration.
113  
114             for (int i = 0; i < partitionCount; i++)
115             {
116                 if (m_elementSelector == null)
117                 {
118                     Contract.Assert(typeof(TSource) == typeof(TElement));
119
120                     var enumerator = new GroupByIdentityQueryOperatorEnumerator<TSource, TGroupKey, TKey>(
121                         hashStream[i], m_keyComparer, cancellationToken);
122
123                     outputStream[i] = (QueryOperatorEnumerator<IGrouping<TGroupKey, TElement>, TKey>)(object)enumerator;
124
125                 }
126                 else
127                 {
128                     outputStream[i] = new GroupByElementSelectorQueryOperatorEnumerator<TSource, TGroupKey, TElement, TKey>(
129                         hashStream[i], m_keyComparer, m_elementSelector, cancellationToken);
130                 }
131             }
132
133             recipient.Receive(outputStream);
134         }
135
136         //---------------------------------------------------------------------------------------
137         // This is a helper method. WrapPartitionedStream decides what type TKey is going
138         // to be, and then call this method with that key as a generic parameter.
139         //
140
141         private void WrapPartitionedStreamHelperOrdered<TKey>(
142             PartitionedStream<Pair<TSource, TGroupKey>, TKey> hashStream,
143             IPartitionedStreamRecipient<IGrouping<TGroupKey, TElement>> recipient,
144             CancellationToken cancellationToken)
145         {
146             int partitionCount = hashStream.PartitionCount;
147             PartitionedStream<IGrouping<TGroupKey, TElement>, TKey> outputStream =
148                 new PartitionedStream<IGrouping<TGroupKey, TElement>, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
149
150             // If there is no element selector, we return a special identity enumerator. Otherwise,
151             // we return one that will apply the element selection function during enumeration.
152
153             IComparer<TKey> orderComparer = hashStream.KeyComparer;
154             for (int i = 0; i < partitionCount; i++)
155             {
156                 if (m_elementSelector == null)
157                 {
158                     Contract.Assert(typeof(TSource) == typeof(TElement));
159
160                     var enumerator = new OrderedGroupByIdentityQueryOperatorEnumerator<TSource, TGroupKey, TKey>(
161                         hashStream[i], m_keySelector, m_keyComparer, orderComparer, cancellationToken);
162
163                     outputStream[i] = (QueryOperatorEnumerator<IGrouping<TGroupKey, TElement>, TKey>)(object)enumerator;
164
165                 }
166                 else
167                 {
168                     outputStream[i] = new OrderedGroupByElementSelectorQueryOperatorEnumerator<TSource, TGroupKey, TElement, TKey>(
169                         hashStream[i], m_keySelector, m_elementSelector, m_keyComparer, orderComparer,
170                         cancellationToken);
171                 }
172             }
173
174             recipient.Receive(outputStream);
175         }
176         
177         //-----------------------------------------------------------------------------------
178         // Override of the query operator base class's Open method. 
179         //
180         internal override QueryResults<IGrouping<TGroupKey, TElement>> Open(QuerySettings settings, bool preferStriping)
181         {
182             // We just open our child operator. Do not propagate the preferStriping value, but instead explicitly
183             // set it to false. Regardless of whether the parent prefers striping or range partitioning, the output
184             // will be hash-partititioned.
185             QueryResults<TSource> childResults = Child.Open(settings, false);
186             return new UnaryQueryOperatorResults(childResults, this, settings, false);
187         }
188
189
190         //---------------------------------------------------------------------------------------
191         // Returns an enumerable that represents the query executing sequentially.
192         //
193         internal override IEnumerable<IGrouping<TGroupKey, TElement>> AsSequentialQuery(CancellationToken token)
194         {
195             IEnumerable<TSource> wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token);
196             if (m_elementSelector == null)
197             {
198                 Contract.Assert(typeof(TElement) == typeof(TSource));
199                 return (IEnumerable<IGrouping<TGroupKey, TElement>>)(object)(wrappedChild.GroupBy(m_keySelector, m_keyComparer));
200             }
201             else
202             {
203                 return wrappedChild.GroupBy(m_keySelector, m_elementSelector, m_keyComparer);
204             }
205         }
206
207
208         //---------------------------------------------------------------------------------------
209         // Whether this operator performs a premature merge that would not be performed in
210         // a similar sequential operation (i.e., in LINQ to Objects).
211         //
212
213         internal override bool LimitsParallelism
214         {
215             get { return false; }
216         }
217     }
218
219
220     //---------------------------------------------------------------------------------------
221     // The enumerator type responsible for grouping elements and yielding the key-value sets.
222     //
223     // Assumptions:
224     //     Just like the Join operator, this won't work properly at all if the analysis engine
225     //     didn't choose to hash partition. We will simply not yield correct groupings.
226     //
227     internal abstract class GroupByQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey> :
228         QueryOperatorEnumerator<IGrouping<TGroupKey, TElement>, TOrderKey>
229     {
230
231         protected readonly QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> m_source; // The data source to enumerate.
232         protected readonly IEqualityComparer<TGroupKey> m_keyComparer; // A key comparer.
233         protected readonly CancellationToken m_cancellationToken;
234         private Mutables m_mutables; // All of the mutable state.
235
236         class Mutables
237         {
238             internal HashLookup<Wrapper<TGroupKey>, ListChunk<TElement>> m_hashLookup; // The lookup with key-value mappings.
239             internal int m_hashLookupIndex; // The current index within the lookup.
240         }
241
242         //---------------------------------------------------------------------------------------
243         // Instantiates a new group by enumerator.
244         //
245
246         protected GroupByQueryOperatorEnumerator(
247             QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
248             IEqualityComparer<TGroupKey> keyComparer, CancellationToken cancellationToken)
249         {
250             Contract.Assert(source != null);
251
252             m_source = source;
253             m_keyComparer = keyComparer;
254             m_cancellationToken = cancellationToken;
255         }
256
257         //---------------------------------------------------------------------------------------
258         // MoveNext will invoke the entire query sub-tree, accumulating results into a hash-
259         // table, upon the first call. Then for the first call and all subsequent calls, we will
260         // just enumerate the key-set from the hash-table, retrieving groupings of key-elements.
261         //
262
263         internal override bool MoveNext(ref IGrouping<TGroupKey, TElement> currentElement, ref TOrderKey currentKey)
264         {
265             Contract.Assert(m_source != null);
266
267             // Lazy-init the mutable state. This also means we haven't yet built our lookup of
268             // groupings, so we can go ahead and do that too.
269             Mutables mutables = m_mutables;
270             if (mutables == null)
271             {
272                 mutables = m_mutables = new Mutables();
273
274                 // Build the hash lookup and start enumerating the lookup at the beginning.
275                 mutables.m_hashLookup = BuildHashLookup();
276                 Contract.Assert(mutables.m_hashLookup != null);
277                 mutables.m_hashLookupIndex = -1;
278             }
279
280             // Now, with a hash lookup in hand, we just enumerate the keys. So long
281             // as the key-value lookup has elements, we have elements.
282             if (++mutables.m_hashLookupIndex < mutables.m_hashLookup.Count)
283             {
284                 currentElement = new GroupByGrouping<TGroupKey, TElement>(
285                     mutables.m_hashLookup[mutables.m_hashLookupIndex]);
286                 return true;
287             }
288
289             return false;
290         }
291
292         //-----------------------------------------------------------------------------------
293         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
294         //
295
296         protected abstract HashLookup<Wrapper<TGroupKey>, ListChunk<TElement>> BuildHashLookup();
297
298         protected override void Dispose(bool disposing)
299         {
300             m_source.Dispose();
301         }
302     }
303
304     //---------------------------------------------------------------------------------------
305     // A specialization of the group by enumerator for yielding elements with the identity
306     // function.
307     //
308
309     internal sealed class GroupByIdentityQueryOperatorEnumerator<TSource, TGroupKey, TOrderKey> :
310         GroupByQueryOperatorEnumerator<TSource, TGroupKey, TSource, TOrderKey>
311     {
312
313         //---------------------------------------------------------------------------------------
314         // Instantiates a new group by enumerator.
315         //
316
317         internal GroupByIdentityQueryOperatorEnumerator(
318             QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
319             IEqualityComparer<TGroupKey> keyComparer, CancellationToken cancellationToken)
320             : base(source, keyComparer, cancellationToken)
321         {
322         }
323
324         //-----------------------------------------------------------------------------------
325         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
326         //
327
328         protected override HashLookup<Wrapper<TGroupKey>, ListChunk<TSource>> BuildHashLookup()
329         {
330             HashLookup<Wrapper<TGroupKey>, ListChunk<TSource>> hashlookup =
331                 new HashLookup<Wrapper<TGroupKey>, ListChunk<TSource>>(new WrapperEqualityComparer<TGroupKey>(m_keyComparer));
332
333             Pair<TSource, TGroupKey> sourceElement = default(Pair<TSource, TGroupKey>);
334             TOrderKey sourceKeyUnused = default(TOrderKey);
335             int i = 0;
336             while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
337             {
338                 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
339                     CancellationState.ThrowIfCanceled(m_cancellationToken);
340
341                 // Generate a key and place it into the hashtable.
342                 Wrapper<TGroupKey> key = new Wrapper<TGroupKey>(sourceElement.Second);
343
344                 // If the key already exists, we just append it to the existing list --
345                 // otherwise we will create a new one and add it to that instead.
346                 ListChunk<TSource> currentValue = null;
347                 if (!hashlookup.TryGetValue(key, ref currentValue))
348                 {
349                     const int INITIAL_CHUNK_SIZE = 2;
350                     currentValue = new ListChunk<TSource>(INITIAL_CHUNK_SIZE);
351                     hashlookup.Add(key, currentValue);
352                 }
353                 Contract.Assert(currentValue != null);
354
355                 // Call to the base class to yield the current value.
356                 currentValue.Add(sourceElement.First);
357             }
358
359             return hashlookup;
360         }
361
362     }
363
364     //---------------------------------------------------------------------------------------
365     // A specialization of the group by enumerator for yielding elements with any arbitrary
366     // element selection function.
367     //
368
369     internal sealed class GroupByElementSelectorQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey> :
370         GroupByQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey>
371     {
372
373         private readonly Func<TSource, TElement> m_elementSelector; // Function to select elements.
374
375         //---------------------------------------------------------------------------------------
376         // Instantiates a new group by enumerator.
377         //
378
379         internal GroupByElementSelectorQueryOperatorEnumerator(
380             QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
381             IEqualityComparer<TGroupKey> keyComparer, Func<TSource, TElement> elementSelector, CancellationToken cancellationToken) :
382             base(source, keyComparer, cancellationToken)
383         {
384             Contract.Assert(elementSelector != null);
385             m_elementSelector = elementSelector;
386         }
387
388         //-----------------------------------------------------------------------------------
389         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
390         //
391
392         protected override HashLookup<Wrapper<TGroupKey>, ListChunk<TElement>> BuildHashLookup()
393         {
394             HashLookup<Wrapper<TGroupKey>, ListChunk<TElement>> hashlookup =
395                 new HashLookup<Wrapper<TGroupKey>, ListChunk<TElement>>(new WrapperEqualityComparer<TGroupKey>(m_keyComparer));
396
397             Pair<TSource, TGroupKey> sourceElement = default(Pair<TSource, TGroupKey>);
398             TOrderKey sourceKeyUnused = default(TOrderKey);
399             int i = 0;
400             while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
401             {
402                 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
403                     CancellationState.ThrowIfCanceled(m_cancellationToken);
404
405                 // Generate a key and place it into the hashtable.
406                 Wrapper<TGroupKey> key = new Wrapper<TGroupKey>(sourceElement.Second);
407
408                 // If the key already exists, we just append it to the existing list --
409                 // otherwise we will create a new one and add it to that instead.
410                 ListChunk<TElement> currentValue = null;
411                 if (!hashlookup.TryGetValue(key, ref currentValue))
412                 {
413                     const int INITIAL_CHUNK_SIZE = 2;
414                     currentValue = new ListChunk<TElement>(INITIAL_CHUNK_SIZE);
415                     hashlookup.Add(key, currentValue);
416                 }
417                 Contract.Assert(currentValue != null);
418
419                 // Call to the base class to yield the current value.
420                 currentValue.Add(m_elementSelector(sourceElement.First));
421             }
422
423             return hashlookup;
424         }
425     }
426
427
428     //---------------------------------------------------------------------------------------
429     // Ordered version of the GroupBy operator.
430     //
431
432     internal abstract class OrderedGroupByQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey> :
433         QueryOperatorEnumerator<IGrouping<TGroupKey, TElement>, TOrderKey>
434     {
435
436         protected readonly QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> m_source; // The data source to enumerate.
437         private readonly Func<TSource, TGroupKey> m_keySelector; // The key selection routine.
438         protected readonly IEqualityComparer<TGroupKey> m_keyComparer; // The key comparison routine.
439         protected readonly IComparer<TOrderKey> m_orderComparer; // The comparison routine for order keys.
440         protected readonly CancellationToken m_cancellationToken;
441         private Mutables m_mutables; // All the mutable state.
442
443         class Mutables
444         {
445             internal HashLookup<Wrapper<TGroupKey>, GroupKeyData> m_hashLookup; // The lookup with key-value mappings.
446             internal int m_hashLookupIndex; // The current index within the lookup.
447         }
448
449         //---------------------------------------------------------------------------------------
450         // Instantiates a new group by enumerator.
451         //
452
453         protected OrderedGroupByQueryOperatorEnumerator(QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
454             Func<TSource, TGroupKey> keySelector, IEqualityComparer<TGroupKey> keyComparer, IComparer<TOrderKey> orderComparer,
455             CancellationToken cancellationToken)
456         {
457             Contract.Assert(source != null);
458             Contract.Assert(keySelector != null);
459
460             m_source = source;
461             m_keySelector = keySelector;
462             m_keyComparer = keyComparer;
463             m_orderComparer = orderComparer;
464             m_cancellationToken = cancellationToken;
465         }
466
467         //---------------------------------------------------------------------------------------
468         // MoveNext will invoke the entire query sub-tree, accumulating results into a hash-
469         // table, upon the first call. Then for the first call and all subsequent calls, we will
470         // just enumerate the key-set from the hash-table, retrieving groupings of key-elements.
471         //
472
473         internal override bool MoveNext(ref IGrouping<TGroupKey, TElement> currentElement, ref TOrderKey currentKey)
474         {
475             Contract.Assert(m_source != null);
476             Contract.Assert(m_keySelector != null);
477
478             // Lazy-init the mutable state. This also means we haven't yet built our lookup of
479             // groupings, so we can go ahead and do that too.
480             Mutables mutables = m_mutables;
481             if (mutables == null)
482             {
483                 mutables = m_mutables = new Mutables();
484
485                 // Build the hash lookup and start enumerating the lookup at the beginning.
486                 mutables.m_hashLookup = BuildHashLookup();
487                 Contract.Assert(mutables.m_hashLookup != null);
488                 mutables.m_hashLookupIndex = -1;
489             }
490
491             // Now, with a hash lookup in hand, we just enumerate the keys. So long
492             // as the key-value lookup has elements, we have elements.
493             if (++mutables.m_hashLookupIndex < mutables.m_hashLookup.Count)
494             {
495                 GroupKeyData value = mutables.m_hashLookup[mutables.m_hashLookupIndex].Value;
496                 currentElement = value.m_grouping;
497                 currentKey = value.m_orderKey;
498                 return true;
499             }
500
501             return false;
502         }
503
504         //-----------------------------------------------------------------------------------
505         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
506         //
507
508         protected abstract HashLookup<Wrapper<TGroupKey>, GroupKeyData> BuildHashLookup();
509
510         protected override void Dispose(bool disposing)
511         {
512             m_source.Dispose();
513         }
514
515         //-----------------------------------------------------------------------------------
516         // A data structure that holds information about elements with a particular key.
517         //
518         // This information includes two parts:
519         //     - An order key for the grouping.
520         //     - The grouping itself. The grouping consists of elements and the grouping key.
521         //
522
523         protected class GroupKeyData
524         {
525             internal TOrderKey m_orderKey;
526             internal OrderedGroupByGrouping<TGroupKey, TOrderKey, TElement> m_grouping;
527
528             internal GroupKeyData(TOrderKey orderKey, TGroupKey hashKey, IComparer<TOrderKey> orderComparer)
529             {
530                 m_orderKey = orderKey;
531                 m_grouping = new OrderedGroupByGrouping<TGroupKey, TOrderKey, TElement>(hashKey, orderComparer);
532             }
533         }
534     }
535
536
537     //---------------------------------------------------------------------------------------
538     // A specialization of the ordered GroupBy enumerator for yielding elements with the identity
539     // function.
540     //
541
542     internal sealed class OrderedGroupByIdentityQueryOperatorEnumerator<TSource, TGroupKey, TOrderKey> :
543         OrderedGroupByQueryOperatorEnumerator<TSource, TGroupKey, TSource, TOrderKey>
544     {
545
546         //---------------------------------------------------------------------------------------
547         // Instantiates a new group by enumerator.
548         //
549
550         internal OrderedGroupByIdentityQueryOperatorEnumerator(QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
551             Func<TSource, TGroupKey> keySelector, IEqualityComparer<TGroupKey> keyComparer, IComparer<TOrderKey> orderComparer, 
552             CancellationToken cancellationToken)
553             : base(source, keySelector, keyComparer, orderComparer, cancellationToken)
554         {
555         }
556
557         //-----------------------------------------------------------------------------------
558         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
559         //
560
561         protected override HashLookup<Wrapper<TGroupKey>, GroupKeyData> BuildHashLookup()
562         {
563             HashLookup<Wrapper<TGroupKey>, GroupKeyData> hashLookup = new HashLookup<Wrapper<TGroupKey>, GroupKeyData>(
564                 new WrapperEqualityComparer<TGroupKey>(m_keyComparer));
565
566             Pair<TSource, TGroupKey> sourceElement = default(Pair<TSource, TGroupKey>);
567             TOrderKey sourceOrderKey = default(TOrderKey);
568             int i = 0;
569             while (m_source.MoveNext(ref sourceElement, ref sourceOrderKey))
570             {
571                 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
572                     CancellationState.ThrowIfCanceled(m_cancellationToken);
573                 
574                 // Generate a key and place it into the hashtable.
575                 Wrapper<TGroupKey> key = new Wrapper<TGroupKey>(sourceElement.Second);
576
577                 // If the key already exists, we just append it to the existing list --
578                 // otherwise we will create a new one and add it to that instead.
579                 GroupKeyData currentValue = null;
580                 if (hashLookup.TryGetValue(key, ref currentValue))
581                 {
582                     if (m_orderComparer.Compare(sourceOrderKey, currentValue.m_orderKey) < 0)
583                     {
584                         currentValue.m_orderKey = sourceOrderKey;
585                     }
586                 }
587                 else
588                 {
589                     currentValue = new GroupKeyData(sourceOrderKey, key.Value, m_orderComparer);
590
591                     hashLookup.Add(key, currentValue);
592                 }
593                 
594                 Contract.Assert(currentValue != null);
595
596                 currentValue.m_grouping.Add(sourceElement.First, sourceOrderKey);
597             }
598
599             // Sort the elements within each group
600             for (int j = 0; j < hashLookup.Count; j++)
601             {
602                 hashLookup[j].Value.m_grouping.DoneAdding();
603             }
604
605             return hashLookup;
606         }
607     }
608
609     //---------------------------------------------------------------------------------------
610     // A specialization of the ordered GroupBy enumerator for yielding elements with any arbitrary
611     // element selection function.
612     //
613
614     internal sealed class OrderedGroupByElementSelectorQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey> :
615         OrderedGroupByQueryOperatorEnumerator<TSource, TGroupKey, TElement, TOrderKey>
616     {
617
618         private readonly Func<TSource, TElement> m_elementSelector; // Function to select elements.
619
620         //---------------------------------------------------------------------------------------
621         // Instantiates a new group by enumerator.
622         //
623
624         internal OrderedGroupByElementSelectorQueryOperatorEnumerator(QueryOperatorEnumerator<Pair<TSource, TGroupKey>, TOrderKey> source,
625             Func<TSource, TGroupKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TGroupKey> keyComparer, IComparer<TOrderKey> orderComparer,
626             CancellationToken cancellationToken) :
627             base(source, keySelector, keyComparer, orderComparer, cancellationToken)
628         {
629             Contract.Assert(elementSelector != null);
630             m_elementSelector = elementSelector;
631         }
632
633         //-----------------------------------------------------------------------------------
634         // Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
635         //
636
637         protected override HashLookup<Wrapper<TGroupKey>, GroupKeyData> BuildHashLookup()
638         {
639             HashLookup<Wrapper<TGroupKey>, GroupKeyData> hashLookup = new HashLookup<Wrapper<TGroupKey>, GroupKeyData>(
640                 new WrapperEqualityComparer<TGroupKey>(m_keyComparer));
641
642             Pair<TSource, TGroupKey> sourceElement = default(Pair<TSource, TGroupKey>);
643             TOrderKey sourceOrderKey = default(TOrderKey);
644             int i = 0;
645             while (m_source.MoveNext(ref sourceElement, ref sourceOrderKey))
646             {
647                 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
648                     CancellationState.ThrowIfCanceled(m_cancellationToken);
649
650                 // Generate a key and place it into the hashtable.
651                 Wrapper<TGroupKey> key = new Wrapper<TGroupKey>(sourceElement.Second);
652
653                 // If the key already exists, we just append it to the existing list --
654                 // otherwise we will create a new one and add it to that instead.
655                 GroupKeyData currentValue = null;
656                 if (hashLookup.TryGetValue(key, ref currentValue))
657                 {
658                     if (m_orderComparer.Compare(sourceOrderKey, currentValue.m_orderKey) < 0)
659                     {
660                         currentValue.m_orderKey = sourceOrderKey;
661                     }
662                 }
663                 else
664                 {
665                     currentValue = new GroupKeyData(sourceOrderKey, key.Value, m_orderComparer);
666
667                     hashLookup.Add(key, currentValue);
668                 }
669
670                 Contract.Assert(currentValue != null);
671
672                 // Call to the base class to yield the current value.
673                 currentValue.m_grouping.Add(m_elementSelector(sourceElement.First), sourceOrderKey);
674             }
675
676             // Sort the elements within each group
677             for (int j = 0; j < hashLookup.Count; j++)
678             {
679                 hashLookup[j].Value.m_grouping.DoneAdding();
680             }
681
682             return hashLookup;
683         }
684     }
685
686
687     //---------------------------------------------------------------------------------------
688     // This little type implements the IGrouping<K,T> interface, and exposes a single
689     // key-to-many-values mapping.
690     //
691
692     internal class GroupByGrouping<TGroupKey, TElement> : IGrouping<TGroupKey, TElement>
693     {
694
695         private KeyValuePair<Wrapper<TGroupKey>, ListChunk<TElement>> m_keyValues; // A key value pair.
696
697         //---------------------------------------------------------------------------------------
698         // Constructs a new grouping out of the key value pair.
699         //
700
701         internal GroupByGrouping(KeyValuePair<Wrapper<TGroupKey>, ListChunk<TElement>> keyValues)
702         {
703             Contract.Assert(keyValues.Value != null);
704             m_keyValues = keyValues;
705         }
706
707         //---------------------------------------------------------------------------------------
708         // The key this mapping represents.
709         //
710
711         TGroupKey IGrouping<TGroupKey, TElement>.Key
712         {
713             get
714             {
715                 return m_keyValues.Key.Value;
716             }
717         }
718
719         //---------------------------------------------------------------------------------------
720         // Access to value enumerators. 
721         //
722
723         IEnumerator<TElement> IEnumerable<TElement>.GetEnumerator()
724         {
725             Contract.Assert(m_keyValues.Value != null);
726             return m_keyValues.Value.GetEnumerator();
727         }
728
729         IEnumerator IEnumerable.GetEnumerator()
730         {
731             return ((IEnumerable<TElement>)this).GetEnumerator();
732         }
733
734     }
735
736     /// <summary>
737     /// An ordered version of the grouping data structure. Represents an ordered group of elements that
738     /// have the same grouping key.
739     /// </summary>
740     internal class OrderedGroupByGrouping<TGroupKey, TOrderKey, TElement> : IGrouping<TGroupKey, TElement>
741     {
742         private TGroupKey m_groupKey; // The group key for this grouping
743         private GrowingArray<TElement> m_values; // Values in this group
744         private GrowingArray<TOrderKey> m_orderKeys; // Order keys that correspond to the values
745         private IComparer<TOrderKey> m_orderComparer; // Comparer for order keys
746
747
748         /// <summary>
749         /// Constructs a new grouping
750         /// </summary>
751         internal OrderedGroupByGrouping(
752             TGroupKey groupKey,
753             IComparer<TOrderKey> orderComparer)
754         {
755             m_groupKey = groupKey;
756             m_values = new GrowingArray<TElement>();
757             m_orderKeys = new GrowingArray<TOrderKey>();
758             m_orderComparer = orderComparer;
759         }
760
761         /// <summary>
762         /// The key this grouping represents.
763         /// </summary>
764         TGroupKey IGrouping<TGroupKey, TElement>.Key
765         {
766             get
767             {
768                 return m_groupKey;
769             }
770         }
771
772         IEnumerator<TElement> IEnumerable<TElement>.GetEnumerator()
773         {
774             Contract.Assert(m_values != null);
775
776
777             int valueCount = m_values.Count;
778             TElement[] valueArray = m_values.InternalArray;
779             Contract.Assert(valueArray.Length >= valueCount); // valueArray.Length may be larger than valueCount
780
781             for (int i = 0; i < valueCount; i++)
782             {
783                 yield return valueArray[i];
784             }
785         }
786
787         IEnumerator IEnumerable.GetEnumerator()
788         {
789             return ((IEnumerable<TElement>)this).GetEnumerator();
790         }
791
792         /// <summary>
793         /// Add an element
794         /// </summary>
795         internal void Add(TElement value, TOrderKey orderKey)
796         {
797             Contract.Assert(m_values != null);
798             Contract.Assert(m_orderKeys != null);
799
800             m_values.Add(value);
801             m_orderKeys.Add(orderKey);
802         }
803
804         /// <summary>
805         /// No more elements will be added, so we can sort the group now.
806         /// </summary>
807         internal void DoneAdding()
808         {
809             Contract.Assert(m_values != null);
810             Contract.Assert(m_orderKeys != null);
811
812             Array.Sort(
813                 m_orderKeys.InternalArray, m_values.InternalArray, 
814                 0, m_values.Count,
815                 m_orderComparer);
816
817 #if DEBUG
818             m_orderKeys = null; // Any future calls to Add() or DoneAdding() will fail
819 #endif
820         }
821     }
822 }