3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // FloatAverageAggregationOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 using System.Core; // for System.Core.SR
20 namespace System.Linq.Parallel
23 /// An inlined average aggregation operator and its enumerator, for floats.
25 internal sealed class FloatAverageAggregationOperator : InlinedAggregationOperator<float, Pair<double, long>, float>
27 //---------------------------------------------------------------------------------------
28 // Constructs a new instance of an average associative operator.
31 internal FloatAverageAggregationOperator(IEnumerable<float> child) : base(child)
35 //---------------------------------------------------------------------------------------
36 // Executes the entire query tree, and aggregates the intermediate results into the
37 // final result based on the binary operators and final reduction.
40 // The single result of aggregation.
43 protected override float InternalAggregate(ref Exception singularExceptionToThrow)
45 // Because the final reduction is typically much cheaper than the intermediate
46 // reductions over the individual partitions, and because each parallel partition
47 // will do a lot of work to produce a single output element, we prefer to turn off
48 // pipelining, and process the final reductions serially.
49 using (IEnumerator<Pair<double, long>> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
51 // Throw an error for empty results.
52 if (!enumerator.MoveNext())
54 singularExceptionToThrow = new InvalidOperationException(SR.GetString(SR.NoElements));
55 return default(float);
58 Pair<double, long> result = enumerator.Current;
60 // Simply add together the sums and totals.
61 while (enumerator.MoveNext())
65 result.First += enumerator.Current.First;
66 result.Second += enumerator.Current.Second;
70 // And divide the sum by the total to obtain the final result.
71 return (float)(result.First / result.Second);
75 //---------------------------------------------------------------------------------------
76 // Creates an enumerator that is used internally for the final aggregation step.
79 protected override QueryOperatorEnumerator<Pair<double, long>, int> CreateEnumerator<TKey>(
80 int index, int count, QueryOperatorEnumerator<float, TKey> source, object sharedData,
81 CancellationToken cancellationToken)
83 return new FloatAverageAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
86 //---------------------------------------------------------------------------------------
87 // This enumerator type encapsulates the intermediary aggregation over the underlying
88 // (possibly partitioned) data source.
91 private class FloatAverageAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<Pair<double, long>>
93 private QueryOperatorEnumerator<float, TKey> m_source; // The source data.
95 //---------------------------------------------------------------------------------------
96 // Instantiates a new aggregation operator.
99 internal FloatAverageAggregationOperatorEnumerator(QueryOperatorEnumerator<float, TKey> source, int partitionIndex,
100 CancellationToken cancellationToken) :
101 base(partitionIndex, cancellationToken)
103 Contract.Assert(source != null);
107 //---------------------------------------------------------------------------------------
108 // Tallies up the average of the underlying data source, walking the entire thing the first
109 // time MoveNext is called on this object.
112 protected override bool MoveNextCore(ref Pair<double, long> currentElement)
114 // The temporary result contains the running sum and count, respectively.
118 QueryOperatorEnumerator<float, TKey> source = m_source;
119 float current = default(float);
120 TKey keyUnused = default(TKey);
122 if (source.MoveNext(ref current, ref keyUnused))
127 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
128 CancellationState.ThrowIfCanceled(m_cancellationToken);
136 while (source.MoveNext(ref current, ref keyUnused));
138 currentElement = new Pair<double, long>(sum, count);
146 //---------------------------------------------------------------------------------------
147 // Dispose of resources associated with the underlying enumerator.
150 protected override void Dispose(bool disposing)
152 Contract.Assert(m_source != null);