Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / ElementAtQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ElementAtQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// ElementAt just retrieves an element at a specific index.  There is some cross-partition
22     /// coordination to force partitions to stop looking once a partition has found the
23     /// sought-after element.
24     /// </summary>
25     /// <typeparam name="TSource"></typeparam>
26     internal sealed class ElementAtQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
27     {
28
29         private readonly int m_index; // The index that we're looking for.
30         private readonly bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
31         private readonly bool m_limitsParallelism = false; // Whether this operator limits parallelism
32
33         //---------------------------------------------------------------------------------------
34         // Constructs a new instance of the contains search operator.
35         //
36         // Arguments:
37         //     child       - the child tree to enumerate.
38         //     index       - index we are searching for.
39         //
40
41         internal ElementAtQueryOperator(IEnumerable<TSource> child, int index)
42             :base(child)
43         {
44             Contract.Assert(child != null, "child data source cannot be null");
45             Contract.Assert(index >= 0, "index can't be less than 0");
46             m_index = index;
47
48             OrdinalIndexState childIndexState = Child.OrdinalIndexState;
49             if (ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Correct))
50             {
51                 m_prematureMerge = true;
52                 m_limitsParallelism = childIndexState != OrdinalIndexState.Shuffled;
53             }
54         }
55
56         //---------------------------------------------------------------------------------------
57         // Just opens the current operator, including opening the child and wrapping it with
58         // partitions as needed.
59         //
60
61         internal override QueryResults<TSource> Open(
62             QuerySettings settings, bool preferStriping)
63         {
64             // We just open the child operator.
65             QueryResults<TSource> childQueryResults = Child.Open(settings, false);
66             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
67         }
68
69         internal override void  WrapPartitionedStream<TKey>(
70             PartitionedStream<TSource,TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
71         {
72             // If the child OOP index is not correct, reindex.
73             int partitionCount = inputStream.PartitionCount;
74
75             PartitionedStream<TSource, int> intKeyStream;
76             if (m_prematureMerge)
77             {
78                 intKeyStream = ExecuteAndCollectResults(inputStream, partitionCount, Child.OutputOrdered, preferStriping, settings).GetPartitionedStream();
79                 Contract.Assert(intKeyStream.OrdinalIndexState == OrdinalIndexState.Indexible);
80             }
81             else
82             {
83                 intKeyStream = (PartitionedStream<TSource, int>)(object)inputStream;
84             }
85
86             // Create a shared cancelation variable and then return a possibly wrapped new enumerator.
87             Shared<bool> resultFoundFlag = new Shared<bool>(false);
88
89             PartitionedStream<TSource, int> outputStream = new PartitionedStream<TSource, int>(
90                 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
91
92             for (int i = 0; i < partitionCount; i++)
93             {
94                 outputStream[i] = new ElementAtQueryOperatorEnumerator(intKeyStream[i], m_index, resultFoundFlag, settings.CancellationState.MergedCancellationToken);
95             }
96
97             recipient.Receive(outputStream);
98         }
99
100         //---------------------------------------------------------------------------------------
101         // Returns an enumerable that represents the query executing sequentially.
102         //
103
104         internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
105         {
106             Contract.Assert(false, "This method should never be called as fallback to sequential is handled in Aggregate().");
107             throw new NotSupportedException();
108         }
109
110         //---------------------------------------------------------------------------------------
111         // Whether this operator performs a premature merge that would not be performed in
112         // a similar sequential operation (i.e., in LINQ to Objects).
113         //
114
115         internal override bool LimitsParallelism
116         {
117             get { return m_limitsParallelism; }
118         }
119
120         
121         /// <summary>
122         /// Executes the query, either sequentially or in parallel, depending on the query execution mode and
123         /// whether a premature merge was inserted by this ElementAt operator.
124         /// </summary>
125         /// <param name="result">result</param>
126         /// <param name="withDefaultValue">withDefaultValue</param>
127         /// <returns>whether an element with this index exists</returns>
128         internal bool Aggregate(out TSource result, bool withDefaultValue)
129         {
130             // If we were to insert a premature merge before this ElementAt, and we are executing in conservative mode, run the whole query
131             // sequentially.
132             if (LimitsParallelism && SpecifiedQuerySettings.WithDefaults().ExecutionMode.Value != ParallelExecutionMode.ForceParallelism)
133             {
134                 CancellationState cancelState = SpecifiedQuerySettings.CancellationState;
135                 if (withDefaultValue)
136                 {
137                     IEnumerable<TSource> childAsSequential = Child.AsSequentialQuery(cancelState.ExternalCancellationToken);
138                     IEnumerable<TSource> childWithCancelChecks = CancellableEnumerable.Wrap(childAsSequential, cancelState.ExternalCancellationToken);
139                     result = ExceptionAggregator.WrapEnumerable(childWithCancelChecks, cancelState).ElementAtOrDefault(m_index);
140                 }
141                 else
142                 {
143                     IEnumerable<TSource> childAsSequential = Child.AsSequentialQuery(cancelState.ExternalCancellationToken);
144                     IEnumerable<TSource> childWithCancelChecks = CancellableEnumerable.Wrap(childAsSequential, cancelState.ExternalCancellationToken);
145                     result = ExceptionAggregator.WrapEnumerable(childWithCancelChecks, cancelState).ElementAt(m_index);
146                 }
147                 return true;
148             }
149
150             using (IEnumerator<TSource> e = GetEnumerator(ParallelMergeOptions.FullyBuffered))
151             {
152                 if (e.MoveNext())
153                 {
154                     TSource current = e.Current;
155                     Contract.Assert(!e.MoveNext(), "expected enumerator to be empty");
156                     result = current;
157                     return true;
158                 }
159             }
160
161             result = default(TSource);
162             return false;
163         }
164
165
166         //---------------------------------------------------------------------------------------
167         // This enumerator performs the search for the element at the specified index.
168         //
169
170         class ElementAtQueryOperatorEnumerator : QueryOperatorEnumerator<TSource, int>
171         {
172             private QueryOperatorEnumerator<TSource, int> m_source; // The source data.
173             private int m_index; // The index of the element to seek.
174             private Shared<bool> m_resultFoundFlag; // Whether to cancel the operation.
175             private CancellationToken m_cancellationToken;
176
177             //---------------------------------------------------------------------------------------
178             // Instantiates a new any/all search operator.
179             //
180
181             internal ElementAtQueryOperatorEnumerator(QueryOperatorEnumerator<TSource, int> source,
182                                                       int index, Shared<bool> resultFoundFlag,
183                 CancellationToken cancellationToken)
184             {
185                 Contract.Assert(source != null);
186                 Contract.Assert(index >= 0);
187                 Contract.Assert(resultFoundFlag != null);
188
189                 m_source = source;
190                 m_index = index;
191                 m_resultFoundFlag = resultFoundFlag;
192                 m_cancellationToken = cancellationToken;
193             }
194
195             //---------------------------------------------------------------------------------------
196             // Enumerates the entire input until the element with the specified is found or another
197             // partition has signaled that it found the element.
198             //
199
200             internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
201             {
202                 // Just walk the enumerator until we've found the element.
203                 int i = 0;
204                 while (m_source.MoveNext(ref currentElement, ref currentKey))
205                 {
206                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
207                         CancellationState.ThrowIfCanceled(m_cancellationToken);
208
209                     if (m_resultFoundFlag.Value)
210                     {
211                         // Another partition found the element.
212                         break;
213                     }
214
215                     if (currentKey == m_index)
216                     {
217                         // We have found the element. Cancel other searches and return true.
218                         m_resultFoundFlag.Value = true;
219                         return true;
220                     }
221                 }
222
223                 return false;
224             }
225
226             protected override void Dispose(bool disposing)
227             {
228                 Contract.Assert(m_source != null);
229                 m_source.Dispose();
230             }
231         }
232     }
233 }