3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // NullableDecimalSumAggregationOperator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// An inlined sum aggregation and its enumerator, for nullable decimals.
23 internal sealed class NullableDecimalSumAggregationOperator : InlinedAggregationOperator<decimal?, decimal?, decimal?>
26 //---------------------------------------------------------------------------------------
27 // Constructs a new instance of a sum associative operator.
30 internal NullableDecimalSumAggregationOperator(IEnumerable<decimal?> child) : base(child)
34 //---------------------------------------------------------------------------------------
35 // Executes the entire query tree, and aggregates the intermediate results into the
36 // final result based on the binary operators and final reduction.
39 // The single result of aggregation.
42 protected override decimal? InternalAggregate(ref Exception singularExceptionToThrow)
44 // Because the final reduction is typically much cheaper than the intermediate
45 // reductions over the individual partitions, and because each parallel partition
46 // will do a lot of work to produce a single output element, we prefer to turn off
47 // pipelining, and process the final reductions serially.
48 using (IEnumerator<decimal?> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
50 // We just reduce the elements in each output partition.
52 while (enumerator.MoveNext())
54 sum += enumerator.Current.GetValueOrDefault();
61 //---------------------------------------------------------------------------------------
62 // Creates an enumerator that is used internally for the final aggregation step.
65 protected override QueryOperatorEnumerator<decimal?,int> CreateEnumerator<TKey>(
66 int index, int count, QueryOperatorEnumerator<decimal?, TKey> source, object sharedData, CancellationToken cancellationToken)
68 return new NullableDecimalSumAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
71 //---------------------------------------------------------------------------------------
72 // This enumerator type encapsulates the intermediary aggregation over the underlying
73 // (possibly partitioned) data source.
76 private class NullableDecimalSumAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<decimal?>
78 private readonly QueryOperatorEnumerator<decimal?, TKey> m_source; // The source data.
80 //---------------------------------------------------------------------------------------
81 // Instantiates a new aggregation operator.
84 internal NullableDecimalSumAggregationOperatorEnumerator(QueryOperatorEnumerator<decimal?, TKey> source, int partitionIndex,
85 CancellationToken cancellationToken) :
86 base(partitionIndex, cancellationToken)
88 Contract.Assert(source != null);
92 //---------------------------------------------------------------------------------------
93 // Tallies up the sum of the underlying data source, walking the entire thing the first
94 // time MoveNext is called on this object.
97 protected override bool MoveNextCore(ref decimal? currentElement)
99 decimal? element = default(decimal?);
100 TKey keyUnused = default(TKey);
102 QueryOperatorEnumerator<decimal?, TKey> source = m_source;
103 if (source.MoveNext(ref element, ref keyUnused))
105 // We just scroll through the enumerator and accumulate the sum.
106 decimal tempSum = 0.0m;
110 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
111 CancellationState.ThrowIfCanceled(m_cancellationToken);
113 tempSum += element.GetValueOrDefault();
115 while (source.MoveNext(ref element, ref keyUnused));
117 // The sum has been calculated. Now just return.
118 currentElement = new decimal?(tempSum);
125 //---------------------------------------------------------------------------------------
126 // Dispose of resources associated with the underlying enumerator.
129 protected override void Dispose(bool disposing)
131 Contract.Assert(m_source != null);