3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // DoubleAverageAggregationOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 using System.Core; // for System.Core.SR
20 namespace System.Linq.Parallel
23 /// An inlined average aggregation operator and its enumerator, for doubles.
25 internal sealed class DoubleAverageAggregationOperator : InlinedAggregationOperator<double, Pair<double, long>, double>
27 //---------------------------------------------------------------------------------------
28 // Constructs a new instance of an average associative operator.
31 internal DoubleAverageAggregationOperator(IEnumerable<double> child) : base(child)
35 //---------------------------------------------------------------------------------------
36 // Executes the entire query tree, and aggregates the intermediate results into the
37 // final result based on the binary operators and final reduction.
40 // The single result of aggregation.
43 protected override double InternalAggregate(ref Exception singularExceptionToThrow)
45 // Because the final reduction is typically much cheaper than the intermediate
46 // reductions over the individual partitions, and because each parallel partition
47 // will do a lot of work to produce a single output element, we prefer to turn off
48 // pipelining, and process the final reductions serially.
49 using (IEnumerator<Pair<double, long>> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
51 // Throw an error for empty results.
52 if (!enumerator.MoveNext())
54 singularExceptionToThrow = new InvalidOperationException(SR.GetString(SR.NoElements));
55 return default(double);
58 Pair<double, long> result = enumerator.Current;
60 // Simply add together the sums and totals.
61 while (enumerator.MoveNext())
65 result.First += enumerator.Current.First;
66 result.Second += enumerator.Current.Second;
70 // And divide the sum by the total to obtain the final result.
71 return result.First / result.Second;
75 //---------------------------------------------------------------------------------------
76 // Creates an enumerator that is used internally for the final aggregation step.
79 protected override QueryOperatorEnumerator<Pair<double, long>, int> CreateEnumerator<TKey>(
80 int index, int count, QueryOperatorEnumerator<double, TKey> source, object sharedData,
81 CancellationToken cancellationToken)
83 return new DoubleAverageAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
86 //---------------------------------------------------------------------------------------
87 // This enumerator type encapsulates the intermediary aggregation over the underlying
88 // (possibly partitioned) data source.
91 private class DoubleAverageAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<Pair<double, long>>
93 private QueryOperatorEnumerator<double, TKey> m_source; // The source data.
95 //---------------------------------------------------------------------------------------
96 // Instantiates a new aggregation operator.
99 internal DoubleAverageAggregationOperatorEnumerator(QueryOperatorEnumerator<double, TKey> source, int partitionIndex,
100 CancellationToken cancellationToken) :
101 base(partitionIndex, cancellationToken)
103 Contract.Assert(source != null);
107 //---------------------------------------------------------------------------------------
108 // Tallies up the average of the underlying data source, walking the entire thing the first
109 // time MoveNext is called on this object.
112 protected override bool MoveNextCore(ref Pair<double, long> currentElement)
114 // The temporary result contains the running sum and count, respectively.
118 QueryOperatorEnumerator<double, TKey> source = m_source;
119 double current = default(double);
120 TKey keyUnused = default(TKey);
122 if (source.MoveNext(ref current, ref keyUnused))
127 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
128 CancellationState.ThrowIfCanceled(m_cancellationToken);
136 while (source.MoveNext(ref current, ref keyUnused));
138 currentElement = new Pair<double, long>(sum, count);
146 //---------------------------------------------------------------------------------------
147 // Dispose of resources associated with the underlying enumerator.
150 protected override void Dispose(bool disposing)
152 Contract.Assert(m_source != null);