Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / AnyAllSearchOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // AnyAllSearchOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// The any/all operators work the same way. They search for the occurrence of a predicate
22     /// value in the data source, and upon the first occurrence of such a value, yield a
23     /// particular value. Specifically:
24     ///
25     ///     - Any returns true if the predicate for any element evaluates to true.
26     ///     - All returns false if the predicate for any element evaluates to false.
27     ///
28     /// This uniformity is used to apply a general purpose algorithm. Both sentences above
29     /// take the form of "returns XXX if the predicate for any element evaluates to XXX."
30     /// Therefore, we just parameterize on XXX, called the qualifciation below, and if we
31     /// ever find an occurrence of XXX in the input data source, we also return XXX. Otherwise,
32     /// we return !XXX. Obviously, XXX in this case is a bool.
33     ///
34     /// This is a search algorithm. So once any single partition finds an element, it will
35     /// return so that execution can stop. This is done with a "cancelation" flag that is
36     /// polled by all parallel workers. The first worker to find an answer sets it, and all
37     /// other workers notice it and quit as quickly as possible.
38     /// </summary>
39     /// <typeparam name="TInput"></typeparam>
40     internal sealed class AnyAllSearchOperator<TInput> : UnaryQueryOperator<TInput, bool>
41     {
42
43         private readonly Func<TInput, bool> m_predicate; // The predicate used to test membership.
44         private readonly bool m_qualification; // Whether we're looking for true (any) or false (all).
45
46         //---------------------------------------------------------------------------------------
47         // Constructs a new instance of an any/all search operator.
48         //
49         // Arguments:
50         //     child         - the child tree to enumerate.
51         //     qualification - the predicate value we require for an element to be considered
52         //                     a member of the set; for any this is true, for all it is false.
53         //
54
55         internal AnyAllSearchOperator(IEnumerable<TInput> child, bool qualification, Func<TInput, bool> predicate)
56             :base(child)
57         {
58             Contract.Assert(child != null, "child data source cannot be null");
59             Contract.Assert(predicate != null, "need a predicate function");
60
61             m_qualification = qualification;
62             m_predicate = predicate;
63         }
64
65         //---------------------------------------------------------------------------------------
66         // Executes the entire query tree, and aggregates the individual partition results to
67         // form an overall answer to the search operation.
68         //
69
70         internal bool Aggregate()
71         {
72             // Because the final reduction is typically much cheaper than the intermediate 
73             // reductions over the individual partitions, and because each parallel partition
74             // could do a lot of work to produce a single output element, we prefer to turn off
75             // pipelining, and process the final reductions serially.
76             using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
77             {
78                 // Any value equal to our qualification means that we've found an element matching
79                 // the condition, and so we return the qualification without looking in subsequent
80                 // partitions.
81                 while (enumerator.MoveNext())
82                 {
83                     if (enumerator.Current == m_qualification)
84                     {
85                         return m_qualification;
86                     }
87                 }
88             }
89
90             return !m_qualification;
91         }
92
93         //---------------------------------------------------------------------------------------
94         // Just opens the current operator, including opening the child and wrapping it with
95         // partitions as needed.
96         //
97
98         internal override QueryResults<bool> Open(
99             QuerySettings settings, bool preferStriping)
100         {
101             // We just open the child operator.
102             QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
103             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
104         }
105
106         internal override void  WrapPartitionedStream<TKey>(
107             PartitionedStream<TInput,TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)
108         {
109             // Create a shared cancelation variable and then return a possibly wrapped new enumerator.
110             Shared<bool> resultFoundFlag = new Shared<bool>(false);
111
112             int partitionCount = inputStream.PartitionCount;
113             PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(
114                 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
115             
116             for (int i = 0; i < partitionCount; i++)
117             {
118                 outputStream[i] = new AnyAllSearchOperatorEnumerator<TKey>(inputStream[i], m_qualification, m_predicate, i, resultFoundFlag, 
119                     settings.CancellationState.MergedCancellationToken);
120             }
121
122             recipient.Receive(outputStream);
123         }
124
125
126         //---------------------------------------------------------------------------------------
127         // Returns an enumerable that represents the query executing sequentially.
128         //
129
130         internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token)
131         {
132             Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
133             throw new NotSupportedException();
134         }
135
136         //---------------------------------------------------------------------------------------
137         // Whether this operator performs a premature merge that would not be performed in
138         // a similar sequential operation (i.e., in LINQ to Objects).
139         //
140
141         internal override bool LimitsParallelism
142         {
143             get { return false; }
144         }
145
146         //---------------------------------------------------------------------------------------
147         // This enumerator performs the search over its input data source. It also cancels peer
148         // enumerators when an answer was found, and polls this cancelation flag to stop when
149         // requested.
150         //
151
152         private class AnyAllSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int>
153         {
154             private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The source data.
155             private readonly Func<TInput, bool> m_predicate; // The predicate.
156             private readonly bool m_qualification; // Whether this is an any (true) or all (false) operator.
157             private readonly int m_partitionIndex; // The partition's index.
158             private readonly Shared<bool> m_resultFoundFlag; // Whether to cancel the search for elements.
159             private readonly CancellationToken m_cancellationToken;
160
161             //---------------------------------------------------------------------------------------
162             // Instantiates a new any/all search operator.
163             //
164
165             internal AnyAllSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, bool qualification,
166                                                     Func<TInput, bool> predicate, int partitionIndex, Shared<bool> resultFoundFlag,
167                                                     CancellationToken cancellationToken)
168             {
169                 Contract.Assert(source != null);
170                 Contract.Assert(predicate != null);
171                 Contract.Assert(resultFoundFlag != null);
172
173                 m_source = source;
174                 m_qualification = qualification;
175                 m_predicate = predicate;
176                 m_partitionIndex = partitionIndex;
177                 m_resultFoundFlag = resultFoundFlag;
178                 m_cancellationToken = cancellationToken;
179             }
180
181             //---------------------------------------------------------------------------------------
182             // This enumerates the entire input source to perform the search. If another peer
183             // partition finds an answer before us, we will voluntarily return (propagating the
184             // peer's result).
185             //
186
187             internal override bool MoveNext(ref bool currentElement, ref int currentKey)
188             {
189                 Contract.Assert(m_predicate != null);
190
191                 // Avoid enumerating if we've already found an answer.
192                 if (m_resultFoundFlag.Value)
193                     return false;
194
195                 // We just scroll through the enumerator and accumulate the result.
196                 TInput element = default(TInput);
197                 TKey keyUnused = default(TKey);
198                 
199                 if (m_source.MoveNext(ref element, ref keyUnused))
200                 {
201                     currentElement = !m_qualification;
202                     currentKey = m_partitionIndex;
203
204                     int i = 0;
205                     // Continue walking the data so long as we haven't found an item that satisfies
206                     // the condition we are searching for.
207                     do
208                     {
209                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
210                             CancellationState.ThrowIfCanceled(m_cancellationToken);
211
212                         if (m_resultFoundFlag.Value)
213                         {
214                             // If cancelation occurred, it's because a successful answer was found.
215                             return false;
216                         }
217
218                         if (m_predicate(element) == m_qualification)
219                         {
220                             // We have found an item that satisfies the search. Tell other
221                             // workers that are concurrently searching, and return.
222                             m_resultFoundFlag.Value = true;
223                             currentElement = m_qualification;
224                             break;
225                         }
226                     }
227                     while (m_source.MoveNext(ref element, ref keyUnused));
228
229                     return true;
230                 }
231
232                 return false;
233             }
234
235             protected override void Dispose(bool disposing)
236             {
237                 Contract.Assert(m_source != null);
238                 m_source.Dispose();
239             }
240         }
241     }
242 }