3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // NullableDoubleSumAggregationOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// An inlined sum aggregation and its enumerator, for nullable doubles.
23 internal sealed class NullableDoubleSumAggregationOperator : InlinedAggregationOperator<double?, double?, double?>
26 //---------------------------------------------------------------------------------------
27 // Constructs a new instance of a sum associative operator.
30 internal NullableDoubleSumAggregationOperator(IEnumerable<double?> child) : base(child)
34 //---------------------------------------------------------------------------------------
35 // Executes the entire query tree, and aggregates the intermediate results into the
36 // final result based on the binary operators and final reduction.
39 // The single result of aggregation.
42 protected override double? InternalAggregate(ref Exception singularExceptionToThrow)
44 // Because the final reduction is typically much cheaper than the intermediate
45 // reductions over the individual partitions, and because each parallel partition
46 // will do a lot of work to produce a single output element, we prefer to turn off
47 // pipelining, and process the final reductions serially.
48 using (IEnumerator<double?> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
50 // We just reduce the elements in each output partition.
52 while (enumerator.MoveNext())
54 sum += enumerator.Current.GetValueOrDefault();
61 //---------------------------------------------------------------------------------------
62 // Creates an enumerator that is used internally for the final aggregation step.
65 protected override QueryOperatorEnumerator<double?,int> CreateEnumerator<TKey>(
66 int index, int count, QueryOperatorEnumerator<double?, TKey> source, object sharedData, CancellationToken cancellationToken)
68 return new NullableDoubleSumAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
71 //---------------------------------------------------------------------------------------
72 // This enumerator type encapsulates the intermediary aggregation over the underlying
73 // (possibly partitioned) data source.
76 private class NullableDoubleSumAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<double?>
78 private readonly QueryOperatorEnumerator<double?, TKey> m_source; // The source data.
80 //---------------------------------------------------------------------------------------
81 // Instantiates a new aggregation operator.
84 internal NullableDoubleSumAggregationOperatorEnumerator(QueryOperatorEnumerator<double?, TKey> source, int partitionIndex,
85 CancellationToken cancellationToken) :
86 base(partitionIndex, cancellationToken)
88 Contract.Assert(source != null);
92 //---------------------------------------------------------------------------------------
93 // Tallies up the sum of the underlying data source, walking the entire thing the first
94 // time MoveNext is called on this object.
97 protected override bool MoveNextCore(ref double? currentElement)
99 double? element = default(double?);
100 TKey keyUnused = default(TKey);
102 QueryOperatorEnumerator<double?, TKey> source = m_source;
103 if (source.MoveNext(ref element, ref keyUnused))
105 // We just scroll through the enumerator and accumulate the sum.
106 double tempSum = 0.0;
110 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
111 CancellationState.ThrowIfCanceled(m_cancellationToken);
113 tempSum += element.GetValueOrDefault();
115 while (source.MoveNext(ref element, ref keyUnused));
117 // The sum has been calculated. Now just return.
118 currentElement = new double?(tempSum);
126 //---------------------------------------------------------------------------------------
127 // Dispose of resources associated with the underlying enumerator.
130 protected override void Dispose(bool disposing)
132 Contract.Assert(m_source != null);