Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Inlined / LongCountAggregationOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // CountAggregationOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17 #if SILVERLIGHT
18 using System.Core; // for System.Core.SR
19 #endif
20
21 namespace System.Linq.Parallel
22 {
23     /// <summary>
24     /// An inlined count aggregation and its enumerator. 
25     /// </summary>
26     /// <typeparam name="TSource"></typeparam>
27     internal sealed class LongCountAggregationOperator<TSource> : InlinedAggregationOperator<TSource, long, long>
28     {
29
30         //---------------------------------------------------------------------------------------
31         // Constructs a new instance of the operator.
32         //
33
34         internal LongCountAggregationOperator(IEnumerable<TSource> child) : base(child)
35         {
36         }
37
38         //---------------------------------------------------------------------------------------
39         // Executes the entire query tree, and aggregates the intermediate results into the
40         // final result based on the binary operators and final reduction.
41         //
42         // Return Value:
43         //     The single result of aggregation.
44         //
45
46         protected override long InternalAggregate(ref Exception singularExceptionToThrow)
47         {
48             // Because the final reduction is typically much cheaper than the intermediate 
49             // reductions over the individual partitions, and because each parallel partition
50             // will do a lot of work to produce a single output element, we prefer to turn off
51             // pipelining, and process the final reductions serially.
52             using (IEnumerator<long> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
53             {
54                 // We just reduce the elements in each output partition.
55                 long count = 0;
56                 while (enumerator.MoveNext())
57                 {
58                     checked
59                     {
60                         count += enumerator.Current;
61                     }
62                 }
63
64                 return count;
65             }
66         }
67
68         //---------------------------------------------------------------------------------------
69         // Creates an enumerator that is used internally for the final aggregation step.
70         //
71
72         protected override QueryOperatorEnumerator<long, int> CreateEnumerator<TKey>(
73             int index, int count, QueryOperatorEnumerator<TSource, TKey> source, object sharedData, CancellationToken cancellationToken)
74         {
75             return new LongCountAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
76         }
77
78         //---------------------------------------------------------------------------------------
79         // This enumerator type encapsulates the intermediary aggregation over the underlying
80         // (possibly partitioned) data source.
81         //
82
83         private class LongCountAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<long>
84         {
85             private readonly QueryOperatorEnumerator<TSource, TKey> m_source; // The source data.
86
87             //---------------------------------------------------------------------------------------
88             // Instantiates a new aggregation operator.
89             //
90
91             internal LongCountAggregationOperatorEnumerator(QueryOperatorEnumerator<TSource, TKey> source, int partitionIndex,
92                 CancellationToken cancellationToken) :
93                 base(partitionIndex, cancellationToken)
94             {
95                 Contract.Assert(source != null);
96                 m_source = source;
97             }
98
99             //---------------------------------------------------------------------------------------
100             // Counts the elements in the underlying data source, walking the entire thing the first
101             // time MoveNext is called on this object.
102             //
103
104             protected override bool MoveNextCore(ref long currentElement)
105             {
106                 TSource elementUnused = default(TSource);
107                 TKey keyUnused = default(TKey);
108
109                 QueryOperatorEnumerator<TSource, TKey> source = m_source;
110                 if (source.MoveNext(ref elementUnused, ref keyUnused))
111                 {
112                     // We just scroll through the enumerator and keep a running count.
113                     long count = 0;
114                     int i = 0;
115                     do
116                     {
117                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
118                             CancellationState.ThrowIfCanceled(m_cancellationToken);
119
120                         checked
121                         {
122                             count++;
123                         }
124                     }
125                     while (source.MoveNext(ref elementUnused, ref keyUnused));
126                     
127                     currentElement = count;
128                     return true;
129                 }
130
131                 return false;
132             }
133
134             //---------------------------------------------------------------------------------------
135             // Dispose of resources associated with the underlying enumerator.
136             //
137
138             protected override void Dispose(bool disposing)
139             {
140                 Contract.Assert(m_source != null);
141                 m_source.Dispose();
142             }
143         }
144     }
145 }