3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // LastQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
21 /// Last tries to discover the last element in the source, optionally matching a
22 /// predicate. All partitions search in parallel, publish the greatest index for a
23 /// candidate match, and reach a barrier. Only the partition that "wins" the ----,
24 /// i.e. who found the candidate with the largest index, will yield an element.
27 /// <typeparam name="TSource"></typeparam>
28 internal sealed class LastQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
31 private readonly Func<TSource, bool> m_predicate; // The optional predicate used during the search.
32 private readonly bool m_prematureMergeNeeded; // Whether to prematurely merge the input of this operator.
34 //---------------------------------------------------------------------------------------
35 // Initializes a new last operator.
38 // child - the child whose data we will reverse
41 internal LastQueryOperator(IEnumerable<TSource> child, Func<TSource, bool> predicate)
44 Contract.Assert(child != null, "child data source cannot be null");
45 m_predicate = predicate;
46 m_prematureMergeNeeded = Child.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
49 //---------------------------------------------------------------------------------------
50 // Just opens the current operator, including opening the child and wrapping it with
51 // partitions as needed.
54 internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
56 QueryResults<TSource> childQueryResults = Child.Open(settings, false);
57 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
60 internal override void WrapPartitionedStream<TKey>(
61 PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
63 // If the index is not at least increasing, we need to reindex.
64 if (m_prematureMergeNeeded)
66 PartitionedStream<TSource, int> intKeyStream =
67 ExecuteAndCollectResults(inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings).GetPartitionedStream();
68 WrapHelper<int>(intKeyStream, recipient, settings);
72 WrapHelper<TKey>(inputStream, recipient, settings);
76 private void WrapHelper<TKey>(PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
78 int partitionCount = inputStream.PartitionCount;
80 // Generate the shared data.
81 LastQueryOperatorState<TKey> operatorState = new LastQueryOperatorState<TKey>();
82 CountdownEvent sharedBarrier = new CountdownEvent(partitionCount);
84 PartitionedStream<TSource, int> outputStream =
85 new PartitionedStream<TSource, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
86 for (int i = 0; i < partitionCount; i++)
88 outputStream[i] = new LastQueryOperatorEnumerator<TKey>(
89 inputStream[i], m_predicate, operatorState, sharedBarrier, settings.CancellationState.MergedCancellationToken,
90 inputStream.KeyComparer, i);
92 recipient.Receive(outputStream);
95 //---------------------------------------------------------------------------------------
96 // Returns an enumerable that represents the query executing sequentially.
98 internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
100 Contract.Assert(false, "This method should never be called as fallback to sequential is handled in ParallelEnumerable.First().");
101 throw new NotSupportedException();
104 //---------------------------------------------------------------------------------------
105 // Whether this operator performs a premature merge that would not be performed in
106 // a similar sequential operation (i.e., in LINQ to Objects).
109 internal override bool LimitsParallelism
111 get { return false; }
114 //---------------------------------------------------------------------------------------
115 // The enumerator type responsible for executing the last operation.
118 class LastQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
121 private QueryOperatorEnumerator<TSource, TKey> m_source; // The data source to enumerate.
122 private Func<TSource, bool> m_predicate; // The optional predicate used during the search.
123 private bool m_alreadySearched; // Set once the enumerator has performed the search.
124 private int m_partitionId; // ID of this partition
126 // Data shared among partitions.
127 private LastQueryOperatorState<TKey> m_operatorState; // The current last candidate and its partition id.
128 private CountdownEvent m_sharedBarrier; // Shared barrier, signaled when partitions find their 1st element.
129 private CancellationToken m_cancellationToken; // Token used to cancel this operator.
130 private IComparer<TKey> m_keyComparer; // Comparer for the order keys
132 //---------------------------------------------------------------------------------------
133 // Instantiates a new enumerator.
136 internal LastQueryOperatorEnumerator(
137 QueryOperatorEnumerator<TSource, TKey> source, Func<TSource, bool> predicate,
138 LastQueryOperatorState<TKey> operatorState, CountdownEvent sharedBarrier, CancellationToken cancelToken,
139 IComparer<TKey> keyComparer, int partitionId)
141 Contract.Assert(source != null);
142 Contract.Assert(operatorState != null);
143 Contract.Assert(sharedBarrier != null);
144 Contract.Assert(keyComparer != null);
147 m_predicate = predicate;
148 m_operatorState = operatorState;
149 m_sharedBarrier = sharedBarrier;
150 m_cancellationToken = cancelToken;
151 m_keyComparer = keyComparer;
152 m_partitionId = partitionId;
155 //---------------------------------------------------------------------------------------
156 // Straightforward IEnumerator<T> methods.
159 internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
161 Contract.Assert(m_source != null);
163 if (m_alreadySearched)
168 // Look for the greatest element.
169 TSource candidate = default(TSource);
170 TKey candidateKey = default(TKey);
171 bool candidateFound = false;
174 int loopCount = 0; //counter to help with cancellation
175 TSource value = default(TSource);
176 TKey key = default(TKey);
177 while (m_source.MoveNext(ref value, ref key))
179 if ((loopCount & CancellationState.POLL_INTERVAL) == 0)
180 CancellationState.ThrowIfCanceled(m_cancellationToken);
182 // If the predicate is null or the current element satisfies it, we will remember
183 // it as the current partition's candidate for the last element, and move on.
184 if (m_predicate == null || m_predicate(value))
188 candidateFound = true;
194 // If we found a candidate element, try to publish it, so long as it's greater.
197 lock (m_operatorState)
199 if (m_operatorState.m_partitionId == -1 || m_keyComparer.Compare(candidateKey, m_operatorState.m_key) > 0)
201 m_operatorState.m_partitionId = m_partitionId;
202 m_operatorState.m_key = candidateKey;
209 // No matter whether we exit due to an exception or normal completion, we must ensure
210 // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
211 m_sharedBarrier.Signal();
214 m_alreadySearched = true;
216 // Only if we have a candidate do we wait.
217 if (m_partitionId == m_operatorState.m_partitionId)
219 m_sharedBarrier.Wait(m_cancellationToken);
221 // Now re-read the shared index. If it's the same as ours, we won and return true.
222 if (m_operatorState.m_partitionId == m_partitionId)
224 currentElement = candidate;
225 currentKey = 0; // 1st (and only) element, so we hardcode the output index to 0.
230 // If we got here, we didn't win. Return false.
234 protected override void Dispose(bool disposing)
241 class LastQueryOperatorState<TKey>
244 internal int m_partitionId = -1;