3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // LongMinMaxAggregationOperator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 using System.Core; // for System.Core.SR
21 namespace System.Linq.Parallel
24 /// An inlined min/max aggregation and its enumerator, for longs.
26 internal sealed class LongMinMaxAggregationOperator : InlinedAggregationOperator<long, long, long>
28 private readonly int m_sign; // The sign (-1 for min, 1 for max).
30 //---------------------------------------------------------------------------------------
31 // Constructs a new instance of a min/max associative operator.
34 internal LongMinMaxAggregationOperator(IEnumerable<long> child, int sign) : base(child)
36 Contract.Assert(sign == -1 || sign == 1, "invalid sign");
40 //---------------------------------------------------------------------------------------
41 // Executes the entire query tree, and aggregates the intermediate results into the
42 // final result based on the binary operators and final reduction.
45 // The single result of aggregation.
48 protected override long InternalAggregate(ref Exception singularExceptionToThrow)
50 // Because the final reduction is typically much cheaper than the intermediate
51 // reductions over the individual partitions, and because each parallel partition
52 // will do a lot of work to produce a single output element, we prefer to turn off
53 // pipelining, and process the final reductions serially.
54 using (IEnumerator<long> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
56 // Throw an error for empty results.
57 if (!enumerator.MoveNext())
59 singularExceptionToThrow = new InvalidOperationException(SR.GetString(SR.NoElements));
63 long best = enumerator.Current;
65 // Based on the sign, do either a min or max reduction.
68 while (enumerator.MoveNext())
70 long current = enumerator.Current;
79 while (enumerator.MoveNext())
81 long current = enumerator.Current;
93 //---------------------------------------------------------------------------------------
94 // Creates an enumerator that is used internally for the final aggregation step.
97 protected override QueryOperatorEnumerator<long, int> CreateEnumerator<TKey>(
98 int index, int count, QueryOperatorEnumerator<long, TKey> source, object sharedData,
99 CancellationToken cancellationToken)
101 return new LongMinMaxAggregationOperatorEnumerator<TKey>(source, index, m_sign, cancellationToken);
104 //---------------------------------------------------------------------------------------
105 // This enumerator type encapsulates the intermediary aggregation over the underlying
106 // (possibly partitioned) data source.
109 private class LongMinMaxAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<long>
111 private QueryOperatorEnumerator<long, TKey> m_source; // The source data.
112 private int m_sign; // The sign for comparisons (-1 means min, 1 means max).
114 //---------------------------------------------------------------------------------------
115 // Instantiates a new aggregation operator.
118 internal LongMinMaxAggregationOperatorEnumerator(QueryOperatorEnumerator<long, TKey> source, int partitionIndex, int sign,
119 CancellationToken cancellationToken) :
120 base(partitionIndex, cancellationToken)
122 Contract.Assert(source != null);
127 //---------------------------------------------------------------------------------------
128 // Tallies up the min/max of the underlying data source, walking the entire thing the first
129 // time MoveNext is called on this object.
132 protected override bool MoveNextCore(ref long currentElement)
134 // Based on the sign, do either a min or max reduction.
135 QueryOperatorEnumerator<long, TKey> source = m_source;
136 TKey keyUnused = default(TKey);
138 if (source.MoveNext(ref currentElement, ref keyUnused))
141 // We just scroll through the enumerator and find the min or max.
144 long elem = default(long);
145 while (source.MoveNext(ref elem, ref keyUnused))
147 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
148 CancellationState.ThrowIfCanceled(m_cancellationToken);
150 if (elem < currentElement)
152 currentElement = elem;
158 long elem = default(long);
159 while (source.MoveNext(ref elem, ref keyUnused))
161 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
162 CancellationState.ThrowIfCanceled(m_cancellationToken);
164 if (elem > currentElement)
166 currentElement = elem;
171 // The sum has been calculated. Now just return.
178 //---------------------------------------------------------------------------------------
179 // Dispose of resources associated with the underlying enumerator.
182 protected override void Dispose(bool disposing)
184 Contract.Assert(m_source != null);