Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / ContainsSearchOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ContainsSearchOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Contains is quite similar to the any/all operator above. Each partition searches a
22     /// subset of elements for a match, and the first one to find a match signals to the rest
23     /// of the partititons to stop searching.
24     /// </summary>
25     /// <typeparam name="TInput"></typeparam>
26     internal sealed class ContainsSearchOperator<TInput> : UnaryQueryOperator<TInput, bool>
27     {
28
29         private readonly TInput m_searchValue; // The value for which we are searching.
30         private readonly IEqualityComparer<TInput> m_comparer; // The comparer to use for equality tests.
31
32         //---------------------------------------------------------------------------------------
33         // Constructs a new instance of the contains search operator.
34         //
35         // Arguments:
36         //     child       - the child tree to enumerate.
37         //     searchValue - value we are searching for.
38         //     comparer    - a comparison routine used to test equality.
39         //
40
41         internal ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer)
42             :base(child)
43         {
44             Contract.Assert(child != null, "child data source cannot be null");
45
46             m_searchValue = searchValue;
47
48             if (comparer == null)
49             {
50                 m_comparer = EqualityComparer<TInput>.Default;
51             }
52             else
53             {
54                 m_comparer = comparer;
55             }
56         }
57
58         //---------------------------------------------------------------------------------------
59         // Executes the entire query tree, and aggregates the individual partition results to
60         // form an overall answer to the search operation.
61         //
62
63         internal bool Aggregate()
64         {
65             // Because the final reduction is typically much cheaper than the intermediate 
66             // reductions over the individual partitions, and because each parallel partition
67             // could do a lot of work to produce a single output element, we prefer to turn off
68             // pipelining, and process the final reductions serially.
69             using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
70             {
71                 // Any value of true means the element was found. We needn't consult all partitions
72                 while (enumerator.MoveNext())
73                 {
74                     if (enumerator.Current)
75                     {
76                         return true;
77                     }
78                 }
79             }
80
81             return false;
82         }
83
84         //---------------------------------------------------------------------------------------
85         // Just opens the current operator, including opening the child and wrapping it with
86         // partitions as needed.
87         //
88
89         internal override QueryResults<bool> Open(QuerySettings settings, bool preferStriping)
90         {
91             QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
92             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
93         }
94
95         internal override void WrapPartitionedStream<TKey>(
96             PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)
97         {
98
99             int partitionCount = inputStream.PartitionCount;
100             PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
101
102             // Create a shared cancelation variable
103             Shared<bool> resultFoundFlag = new Shared<bool>(false);
104             for (int i = 0; i < partitionCount; i++)
105             {
106                 outputStream[i] = new ContainsSearchOperatorEnumerator<TKey>(inputStream[i], m_searchValue, m_comparer, i, resultFoundFlag, 
107                     settings.CancellationState.MergedCancellationToken);
108             }
109
110             recipient.Receive(outputStream);
111         }
112
113         //---------------------------------------------------------------------------------------
114         // Returns an enumerable that represents the query executing sequentially.
115         //
116
117         internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token)
118         {
119             Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
120             throw new NotSupportedException();
121         }
122
123         //---------------------------------------------------------------------------------------
124         // Whether this operator performs a premature merge that would not be performed in
125         // a similar sequential operation (i.e., in LINQ to Objects).
126         //
127
128         internal override bool LimitsParallelism
129         {
130             get { return false; }
131         }
132
133         //---------------------------------------------------------------------------------------
134         // This enumerator performs the search over its input data source. It also cancels peer
135         // enumerators when an answer was found, and polls this cancelation flag to stop when
136         // requested.
137         //
138
139         class ContainsSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int>
140         {
141             private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The source data.
142             private readonly TInput m_searchValue; // The value for which we are searching.
143             private readonly IEqualityComparer<TInput> m_comparer; // The comparer to use for equality tests.
144             private readonly int m_partitionIndex; // This partition's unique index.
145             private readonly Shared<bool> m_resultFoundFlag; // Whether to cancel the operation.
146             private CancellationToken m_cancellationToken;
147
148             //---------------------------------------------------------------------------------------
149             // Instantiates a new any/all search operator.
150             //
151
152             internal ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue,
153                                                       IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag,
154                 CancellationToken cancellationToken)
155             {
156                 Contract.Assert(source != null);
157                 Contract.Assert(comparer != null);
158                 Contract.Assert(resultFoundFlag != null);
159
160                 m_source = source;
161                 m_searchValue = searchValue;
162                 m_comparer = comparer;
163                 m_partitionIndex = partitionIndex;
164                 m_resultFoundFlag = resultFoundFlag;
165                 m_cancellationToken = cancellationToken;
166             }
167
168             //---------------------------------------------------------------------------------------
169             // This enumerates the entire input source to perform the search. If another peer
170             // partition finds an answer before us, we will voluntarily return (propagating the
171             // peer's result).
172             //
173
174             internal override bool MoveNext(ref bool currentElement, ref int currentKey)
175             {
176                 Contract.Assert(m_comparer != null);
177
178                 // Avoid enumerating if we've already found an answer.
179                 if (m_resultFoundFlag.Value)
180                     return false;
181
182                 // We just scroll through the enumerator and accumulate the result.
183                 TInput element = default(TInput);
184                 TKey keyUnused = default(TKey);
185                 if (m_source.MoveNext(ref element, ref keyUnused))
186                 {
187                     currentElement = false;
188                     currentKey = m_partitionIndex;
189
190                     // Continue walking the data so long as we haven't found an item that satisfies
191                     // the condition we are searching for.
192                     int i = 0;
193                     do
194                     {
195                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
196                             CancellationState.ThrowIfCanceled(m_cancellationToken);
197
198                         if (m_resultFoundFlag.Value)
199                         {
200                             // If cancelation occurred, it's because a successful answer was found.
201                             return false;
202                         }
203
204                         if (m_comparer.Equals(element, m_searchValue))
205                         {
206                             // We have found an item that satisfies the search. Cancel other
207                             // workers that are concurrently searching, and return.
208                             m_resultFoundFlag.Value = true;
209                             currentElement = true;
210                             break;
211                         }
212                     }
213                     while (m_source.MoveNext(ref element, ref keyUnused));
214
215                     return true;
216                 }
217
218                 return false;
219             }
220
221             protected override void Dispose(bool disposing)
222             {
223                 Contract.Assert(m_source != null);
224                 m_source.Dispose();
225             }
226         }
227     }
228 }