Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Inlined / NullableFloatAverageAggregationOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // NullableFloatAverageAggregationOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// An inlined average aggregation operator and its enumerator, for Nullable floats.
22     /// </summary>
23     internal sealed class NullableFloatAverageAggregationOperator : InlinedAggregationOperator<float?, Pair<double, long>, float?>
24     {
25         //---------------------------------------------------------------------------------------
26         // Constructs a new instance of an average associative operator.
27         //
28
29         internal NullableFloatAverageAggregationOperator(IEnumerable<float?> child) : base(child)
30         {
31         }
32
33         //---------------------------------------------------------------------------------------
34         // Executes the entire query tree, and aggregates the intermediate results into the
35         // final result based on the binary operators and final reduction.
36         //
37         // Return Value:
38         //     The single result of aggregation.
39         //
40
41         protected override float? InternalAggregate(ref Exception singularExceptionToThrow)
42         {
43             // Because the final reduction is typically much cheaper than the intermediate 
44             // reductions over the individual partitions, and because each parallel partition
45             // will do a lot of work to produce a single output element, we prefer to turn off
46             // pipelining, and process the final reductions serially.
47             using (IEnumerator<Pair<double, long>> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
48             {
49                 // If the sequence was empty, return null right away.
50                 if (!enumerator.MoveNext())
51                 {
52                     return null;
53                 }
54
55                 Pair<double, long> result = enumerator.Current;
56
57                 // Simply add together the sums and totals.
58                 while (enumerator.MoveNext())
59                 {
60                     checked
61                     {
62                         result.First += enumerator.Current.First;
63                         result.Second += enumerator.Current.Second;
64                     }
65                 }
66
67                 // And divide the sum by the total to obtain the final result.
68                 return (float)(result.First / result.Second);
69             }
70         }
71
72         //---------------------------------------------------------------------------------------
73         // Creates an enumerator that is used internally for the final aggregation step.
74         //
75
76         protected override QueryOperatorEnumerator<Pair<double, long>,int> CreateEnumerator<TKey>(
77             int index, int count, QueryOperatorEnumerator<float?, TKey> source, object sharedData, CancellationToken cancellationToken)
78         {
79             return new NullableFloatAverageAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
80         }
81
82         //---------------------------------------------------------------------------------------
83         // This enumerator type encapsulates the intermediary aggregation over the underlying
84         // (possibly partitioned) data source.
85         //
86
87         private class NullableFloatAverageAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<Pair<double, long>>
88         {
89             private QueryOperatorEnumerator<float?, TKey> m_source; // The source data.
90
91             //---------------------------------------------------------------------------------------
92             // Instantiates a new aggregation operator.
93             //
94
95             internal NullableFloatAverageAggregationOperatorEnumerator(QueryOperatorEnumerator<float?, TKey> source, int partitionIndex,
96                 CancellationToken cancellationToken) :
97                 base(partitionIndex, cancellationToken)
98             {
99                 Contract.Assert(source != null);
100                 m_source = source;
101             }
102
103             //---------------------------------------------------------------------------------------
104             // Tallies up the average of the underlying data source, walking the entire thing the first
105             // time MoveNext is called on this object.
106             //
107
108             protected override bool MoveNextCore(ref Pair<double, long> currentElement)
109             {
110                 // The temporary result contains the running sum and count, respectively.
111                 double sum = 0.0;
112                 long count = 0;
113
114                 QueryOperatorEnumerator<float?, TKey> source = m_source;
115                 float? current = default(float?);
116                 TKey keyUnused = default(TKey);
117                 
118                 int i = 0;
119                 while (source.MoveNext(ref current, ref keyUnused))
120                 {
121                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
122                         CancellationState.ThrowIfCanceled(m_cancellationToken);
123
124                     if (current.HasValue)
125                     {
126                         sum += current.GetValueOrDefault();
127                         count++;
128                     }
129                 }
130
131                 currentElement = new Pair<double, long>(sum, count);
132                 return count > 0;
133             }
134
135             //---------------------------------------------------------------------------------------
136             // Dispose of resources associated with the underlying enumerator.
137             //
138
139             protected override void Dispose(bool disposing)
140             {
141                 Contract.Assert(m_source != null);
142                 m_source.Dispose();
143             }
144         }
145     }
146 }