Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Inlined / FloatAverageAggregationOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // FloatAverageAggregationOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17 #if SILVERLIGHT
18 using System.Core; // for System.Core.SR
19 #endif
20 namespace System.Linq.Parallel
21 {
22     /// <summary>
23     /// An inlined average aggregation operator and its enumerator, for floats. 
24     /// </summary>
25     internal sealed class FloatAverageAggregationOperator : InlinedAggregationOperator<float, Pair<double, long>, float>
26     {
27         //---------------------------------------------------------------------------------------
28         // Constructs a new instance of an average associative operator.
29         //
30
31         internal FloatAverageAggregationOperator(IEnumerable<float> child) : base(child)
32         {
33         }
34
35         //---------------------------------------------------------------------------------------
36         // Executes the entire query tree, and aggregates the intermediate results into the
37         // final result based on the binary operators and final reduction.
38         //
39         // Return Value:
40         //     The single result of aggregation.
41         //
42
43         protected override float InternalAggregate(ref Exception singularExceptionToThrow)
44         {
45             // Because the final reduction is typically much cheaper than the intermediate 
46             // reductions over the individual partitions, and because each parallel partition
47             // will do a lot of work to produce a single output element, we prefer to turn off
48             // pipelining, and process the final reductions serially.
49             using (IEnumerator<Pair<double, long>> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
50             {
51                 // Throw an error for empty results.
52                 if (!enumerator.MoveNext())
53                 {
54                     singularExceptionToThrow = new InvalidOperationException(SR.GetString(SR.NoElements));
55                     return default(float);
56                 }
57
58                 Pair<double, long> result = enumerator.Current;
59
60                 // Simply add together the sums and totals.
61                 while (enumerator.MoveNext())
62                 {
63                     checked
64                     {
65                         result.First += enumerator.Current.First;
66                         result.Second += enumerator.Current.Second;
67                     }
68                 }
69
70                 // And divide the sum by the total to obtain the final result.
71                 return (float)(result.First / result.Second);
72             }
73         }
74
75         //---------------------------------------------------------------------------------------
76         // Creates an enumerator that is used internally for the final aggregation step.
77         //
78
79         protected override QueryOperatorEnumerator<Pair<double, long>, int> CreateEnumerator<TKey>(
80             int index, int count, QueryOperatorEnumerator<float, TKey> source, object sharedData,
81             CancellationToken cancellationToken)
82         {
83             return new FloatAverageAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
84         }
85
86         //---------------------------------------------------------------------------------------
87         // This enumerator type encapsulates the intermediary aggregation over the underlying
88         // (possibly partitioned) data source.
89         //
90
91         private class FloatAverageAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<Pair<double, long>>
92         {
93             private QueryOperatorEnumerator<float, TKey> m_source; // The source data.
94
95             //---------------------------------------------------------------------------------------
96             // Instantiates a new aggregation operator.
97             //
98
99             internal FloatAverageAggregationOperatorEnumerator(QueryOperatorEnumerator<float, TKey> source, int partitionIndex,
100                 CancellationToken cancellationToken) :
101                 base(partitionIndex, cancellationToken)
102             {
103                 Contract.Assert(source != null);
104                 m_source = source;
105             }
106
107             //---------------------------------------------------------------------------------------
108             // Tallies up the average of the underlying data source, walking the entire thing the first
109             // time MoveNext is called on this object.
110             //
111
112             protected override bool MoveNextCore(ref Pair<double, long> currentElement)
113             {
114                 // The temporary result contains the running sum and count, respectively.
115                 double sum = 0.0;
116                 long count = 0;
117
118                 QueryOperatorEnumerator<float, TKey> source = m_source;
119                 float current = default(float);
120                 TKey keyUnused = default(TKey);
121
122                 if (source.MoveNext(ref current, ref keyUnused))
123                 {
124                     int i = 0;
125                     do
126                     {
127                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
128                             CancellationState.ThrowIfCanceled(m_cancellationToken);
129
130                         checked
131                         {
132                             sum += current;
133                             count++;
134                         }
135                     }
136                     while (source.MoveNext(ref current, ref keyUnused));
137
138                     currentElement = new Pair<double, long>(sum, count);
139
140                     return true;
141                 }
142
143                 return false;
144             }
145
146             //---------------------------------------------------------------------------------------
147             // Dispose of resources associated with the underlying enumerator.
148             //
149
150             protected override void Dispose(bool disposing)
151             {
152                 Contract.Assert(m_source != null);
153                 m_source.Dispose();
154             }
155         }
156     }
157 }