3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // AnyAllSearchOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// The any/all operators work the same way. They search for the occurrence of a predicate
22 /// value in the data source, and upon the first occurrence of such a value, yield a
23 /// particular value. Specifically:
25 /// - Any returns true if the predicate for any element evaluates to true.
26 /// - All returns false if the predicate for any element evaluates to false.
28 /// This uniformity is used to apply a general purpose algorithm. Both sentences above
29 /// take the form of "returns XXX if the predicate for any element evaluates to XXX."
30 /// Therefore, we just parameterize on XXX, called the qualifciation below, and if we
31 /// ever find an occurrence of XXX in the input data source, we also return XXX. Otherwise,
32 /// we return !XXX. Obviously, XXX in this case is a bool.
34 /// This is a search algorithm. So once any single partition finds an element, it will
35 /// return so that execution can stop. This is done with a "cancelation" flag that is
36 /// polled by all parallel workers. The first worker to find an answer sets it, and all
37 /// other workers notice it and quit as quickly as possible.
39 /// <typeparam name="TInput"></typeparam>
40 internal sealed class AnyAllSearchOperator<TInput> : UnaryQueryOperator<TInput, bool>
43 private readonly Func<TInput, bool> m_predicate; // The predicate used to test membership.
44 private readonly bool m_qualification; // Whether we're looking for true (any) or false (all).
46 //---------------------------------------------------------------------------------------
47 // Constructs a new instance of an any/all search operator.
50 // child - the child tree to enumerate.
51 // qualification - the predicate value we require for an element to be considered
52 // a member of the set; for any this is true, for all it is false.
55 internal AnyAllSearchOperator(IEnumerable<TInput> child, bool qualification, Func<TInput, bool> predicate)
58 Contract.Assert(child != null, "child data source cannot be null");
59 Contract.Assert(predicate != null, "need a predicate function");
61 m_qualification = qualification;
62 m_predicate = predicate;
65 //---------------------------------------------------------------------------------------
66 // Executes the entire query tree, and aggregates the individual partition results to
67 // form an overall answer to the search operation.
70 internal bool Aggregate()
72 // Because the final reduction is typically much cheaper than the intermediate
73 // reductions over the individual partitions, and because each parallel partition
74 // could do a lot of work to produce a single output element, we prefer to turn off
75 // pipelining, and process the final reductions serially.
76 using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
78 // Any value equal to our qualification means that we've found an element matching
79 // the condition, and so we return the qualification without looking in subsequent
81 while (enumerator.MoveNext())
83 if (enumerator.Current == m_qualification)
85 return m_qualification;
90 return !m_qualification;
93 //---------------------------------------------------------------------------------------
94 // Just opens the current operator, including opening the child and wrapping it with
95 // partitions as needed.
98 internal override QueryResults<bool> Open(
99 QuerySettings settings, bool preferStriping)
101 // We just open the child operator.
102 QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
103 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
106 internal override void WrapPartitionedStream<TKey>(
107 PartitionedStream<TInput,TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)
109 // Create a shared cancelation variable and then return a possibly wrapped new enumerator.
110 Shared<bool> resultFoundFlag = new Shared<bool>(false);
112 int partitionCount = inputStream.PartitionCount;
113 PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(
114 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
116 for (int i = 0; i < partitionCount; i++)
118 outputStream[i] = new AnyAllSearchOperatorEnumerator<TKey>(inputStream[i], m_qualification, m_predicate, i, resultFoundFlag,
119 settings.CancellationState.MergedCancellationToken);
122 recipient.Receive(outputStream);
126 //---------------------------------------------------------------------------------------
127 // Returns an enumerable that represents the query executing sequentially.
130 internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token)
132 Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
133 throw new NotSupportedException();
136 //---------------------------------------------------------------------------------------
137 // Whether this operator performs a premature merge that would not be performed in
138 // a similar sequential operation (i.e., in LINQ to Objects).
141 internal override bool LimitsParallelism
143 get { return false; }
146 //---------------------------------------------------------------------------------------
147 // This enumerator performs the search over its input data source. It also cancels peer
148 // enumerators when an answer was found, and polls this cancelation flag to stop when
152 private class AnyAllSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int>
154 private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The source data.
155 private readonly Func<TInput, bool> m_predicate; // The predicate.
156 private readonly bool m_qualification; // Whether this is an any (true) or all (false) operator.
157 private readonly int m_partitionIndex; // The partition's index.
158 private readonly Shared<bool> m_resultFoundFlag; // Whether to cancel the search for elements.
159 private readonly CancellationToken m_cancellationToken;
161 //---------------------------------------------------------------------------------------
162 // Instantiates a new any/all search operator.
165 internal AnyAllSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, bool qualification,
166 Func<TInput, bool> predicate, int partitionIndex, Shared<bool> resultFoundFlag,
167 CancellationToken cancellationToken)
169 Contract.Assert(source != null);
170 Contract.Assert(predicate != null);
171 Contract.Assert(resultFoundFlag != null);
174 m_qualification = qualification;
175 m_predicate = predicate;
176 m_partitionIndex = partitionIndex;
177 m_resultFoundFlag = resultFoundFlag;
178 m_cancellationToken = cancellationToken;
181 //---------------------------------------------------------------------------------------
182 // This enumerates the entire input source to perform the search. If another peer
183 // partition finds an answer before us, we will voluntarily return (propagating the
187 internal override bool MoveNext(ref bool currentElement, ref int currentKey)
189 Contract.Assert(m_predicate != null);
191 // Avoid enumerating if we've already found an answer.
192 if (m_resultFoundFlag.Value)
195 // We just scroll through the enumerator and accumulate the result.
196 TInput element = default(TInput);
197 TKey keyUnused = default(TKey);
199 if (m_source.MoveNext(ref element, ref keyUnused))
201 currentElement = !m_qualification;
202 currentKey = m_partitionIndex;
205 // Continue walking the data so long as we haven't found an item that satisfies
206 // the condition we are searching for.
209 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
210 CancellationState.ThrowIfCanceled(m_cancellationToken);
212 if (m_resultFoundFlag.Value)
214 // If cancelation occurred, it's because a successful answer was found.
218 if (m_predicate(element) == m_qualification)
220 // We have found an item that satisfies the search. Tell other
221 // workers that are concurrently searching, and return.
222 m_resultFoundFlag.Value = true;
223 currentElement = m_qualification;
227 while (m_source.MoveNext(ref element, ref keyUnused));
235 protected override void Dispose(bool disposing)
237 Contract.Assert(m_source != null);