Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / TakeOrSkipQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // TakeOrSkipQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Take and Skip either take or skip a specified number of elements, captured in the
22     /// count argument.  These will work a little bit like TakeWhile and SkipWhile: there
23     /// are two phases, (1) Search and (2) Yield.  In the search phase, our goal is to
24     /// find the 'count'th index from the input.  We do this in parallel by sharing a count-
25     /// sized array.  Each thread ----s to populate the array with indices in ascending
26     /// order.  This requires synchronization for inserts.  We use a simple heap, for decent
27     /// worst case performance.  After a thread has scanned \91count\92 elements, or its current
28     /// index is greater than or equal to the maximum index in the array (and the array is
29     /// fully populated), the thread can stop searching.  All threads issue a barrier before
30     /// moving to the Yield phase.  When the Yield phase is entered, the count-1th element
31     /// of the array contains: in the case of Take, the maximum index (exclusive) to be
32     /// returned; or in the case of Skip, the minimum index (inclusive) to be returned.  The
33     /// Yield phase simply consists of yielding these elements as output.
34     /// </summary>
35     /// <typeparam name="TResult"></typeparam>
36     internal sealed class TakeOrSkipQueryOperator<TResult> : UnaryQueryOperator<TResult, TResult>
37     {
38
39         private readonly int m_count; // The number of elements to take or skip.
40         private readonly bool m_take; // Whether to take (true) or skip (false).
41         private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
42
43         //---------------------------------------------------------------------------------------
44         // Initializes a new take-while operator.
45         //
46         // Arguments:
47         //     child  - the child data source to enumerate
48         //     count  - the number of elements to take or skip
49         //     take   - whether this is a Take (true) or Skip (false)
50         //
51
52         internal TakeOrSkipQueryOperator(IEnumerable<TResult> child, int count, bool take)
53             :base(child)
54         {
55             Contract.Assert(child != null, "child data source cannot be null");
56
57             m_count = count;
58             m_take = take;
59
60             SetOrdinalIndexState(OutputOrdinalIndexState());
61         }
62
63         /// <summary>
64         /// Determines the order index state for the output operator
65         /// </summary>
66         private OrdinalIndexState OutputOrdinalIndexState()
67         {
68             OrdinalIndexState indexState = Child.OrdinalIndexState;
69
70             if (indexState == OrdinalIndexState.Indexible)
71             {
72                 return OrdinalIndexState.Indexible;
73             }
74
75             if (indexState.IsWorseThan(OrdinalIndexState.Increasing))
76             {
77                 m_prematureMerge = true;
78                 indexState = OrdinalIndexState.Correct;
79             }
80
81             // If the operator is skip and the index was correct, now it is only increasing.
82             if (!m_take && indexState == OrdinalIndexState.Correct)
83             {
84                 indexState = OrdinalIndexState.Increasing;
85             }
86
87             return indexState;
88         }
89
90         internal override void WrapPartitionedStream<TKey>(
91             PartitionedStream<TResult, TKey> inputStream, IPartitionedStreamRecipient<TResult> recipient, bool preferStriping, QuerySettings settings)
92         {
93             Contract.Assert(Child.OrdinalIndexState != OrdinalIndexState.Indexible, "Don't take this code path if the child is indexible.");
94
95             // If the index is not at least increasing, we need to reindex.
96             if (m_prematureMerge)
97             {
98                 ListQueryResults<TResult> results = ExecuteAndCollectResults(
99                     inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings);
100                 PartitionedStream<TResult, int> inputIntStream = results.GetPartitionedStream();
101                 WrapHelper<int>(inputIntStream, recipient, settings);
102             }
103             else
104             {
105                 WrapHelper<TKey>(inputStream, recipient, settings);
106             }
107         }
108
109         private void WrapHelper<TKey>(PartitionedStream<TResult, TKey> inputStream, IPartitionedStreamRecipient<TResult> recipient, QuerySettings settings)
110         {
111             int partitionCount = inputStream.PartitionCount;
112             FixedMaxHeap<TKey> sharedIndices = new FixedMaxHeap<TKey>(m_count, inputStream.KeyComparer); // an array used to track the sequence of indices leading up to the Nth index
113             CountdownEvent sharredBarrier = new CountdownEvent(partitionCount); // a barrier to synchronize before yielding
114
115             PartitionedStream<TResult, TKey> outputStream =
116                 new PartitionedStream<TResult, TKey>(partitionCount, inputStream.KeyComparer, OrdinalIndexState);
117             for (int i = 0; i < partitionCount; i++)
118             {
119                 outputStream[i] = new TakeOrSkipQueryOperatorEnumerator<TKey>(
120                     inputStream[i], m_take, sharedIndices, sharredBarrier,
121                     settings.CancellationState.MergedCancellationToken, inputStream.KeyComparer);
122             }
123
124             recipient.Receive(outputStream);
125         }
126
127         //---------------------------------------------------------------------------------------
128         // Just opens the current operator, including opening the child and wrapping it with
129         // partitions as needed.
130         //
131
132         internal override QueryResults<TResult> Open(QuerySettings settings, bool preferStriping)
133         {
134             QueryResults<TResult> childQueryResults = Child.Open(settings, true);
135             return TakeOrSkipQueryOperatorResults.NewResults(childQueryResults, this, settings, preferStriping);
136         }
137
138         //---------------------------------------------------------------------------------------
139         // Whether this operator performs a premature merge that would not be performed in
140         // a similar sequential operation (i.e., in LINQ to Objects).
141         //
142
143         internal override bool LimitsParallelism
144         {
145             get { return false; }
146         }
147
148         //---------------------------------------------------------------------------------------
149         // The enumerator type responsible for executing the Take or Skip.
150         //
151
152         class TakeOrSkipQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TResult, TKey>
153         {
154             private readonly QueryOperatorEnumerator<TResult, TKey> m_source; // The data source to enumerate.
155             private readonly int m_count; // The number of elements to take or skip.
156             private readonly bool m_take; // Whether to execute a Take (true) or Skip (false).
157             private readonly IComparer<TKey> m_keyComparer; // Comparer for the order keys.
158
159             // These fields are all shared among partitions.
160             private readonly FixedMaxHeap<TKey> m_sharedIndices; // The indices shared among partitions.
161             private readonly CountdownEvent m_sharedBarrier; // To separate the search/yield phases.
162             private readonly CancellationToken m_cancellationToken; // Indicates that cancellation has occurred.
163
164             private List<Pair<TResult, TKey>> m_buffer; // Our buffer.
165             private Shared<int> m_bufferIndex; // Our current index within the buffer. [allocate in moveNext to avoid false-sharing]
166
167             //---------------------------------------------------------------------------------------
168             // Instantiates a new select enumerator.
169             //
170
171             internal TakeOrSkipQueryOperatorEnumerator(
172                 QueryOperatorEnumerator<TResult, TKey> source, bool take,
173                 FixedMaxHeap<TKey> sharedIndices, CountdownEvent sharedBarrier, CancellationToken cancellationToken,
174                 IComparer<TKey> keyComparer)
175             {
176                 Contract.Assert(source != null);
177                 Contract.Assert(sharedIndices != null);
178                 Contract.Assert(sharedBarrier != null);
179                 Contract.Assert(keyComparer != null);
180
181                 m_source = source;
182                 m_count = sharedIndices.Size;
183                 m_take = take;
184                 m_sharedIndices = sharedIndices;
185                 m_sharedBarrier = sharedBarrier;
186                 m_cancellationToken = cancellationToken;
187                 m_keyComparer = keyComparer;
188             }
189
190             //---------------------------------------------------------------------------------------
191             // Straightforward IEnumerator<T> methods.
192             //
193
194             internal override bool MoveNext(ref TResult currentElement, ref TKey currentKey)
195             {
196                 Contract.Assert(m_sharedIndices != null);
197
198                 // If the buffer has not been created, we will populate it lazily on demand.
199                 if (m_buffer == null && m_count > 0)
200                 {
201                     
202
203                     // Create a buffer, but don't publish it yet (in case of exception).
204                     List<Pair<TResult, TKey>> buffer = new List<Pair<TResult, TKey>>();
205
206                     // Enter the search phase. In this phase, all partitions ---- to populate
207                     // the shared indices with their first 'count' contiguous elements.
208                     TResult current = default(TResult);
209                     TKey index = default(TKey);
210                     int i = 0; //counter to help with cancellation
211                     while (buffer.Count < m_count && m_source.MoveNext(ref current, ref index))
212                     {
213                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
214                             CancellationState.ThrowIfCanceled(m_cancellationToken);
215                         
216                         // Add the current element to our buffer.
217                         buffer.Add(new Pair<TResult, TKey>(current, index));
218
219                         // Now we will try to insert our index into the shared indices list, quitting if
220                         // our index is greater than all of the indices already inside it.
221                         lock (m_sharedIndices)
222                         {
223                             if (!m_sharedIndices.Insert(index))
224                             {
225                                 // We have read past the maximum index. We can move to the barrier now.
226                                 break;
227                             }
228                         }
229                     }
230
231                     // Before exiting the search phase, we will synchronize with others. This is a barrier.
232                     m_sharedBarrier.Signal();
233                     m_sharedBarrier.Wait(m_cancellationToken);
234
235                     // Publish the buffer and set the index to just before the 1st element.
236                     m_buffer = buffer;
237                     m_bufferIndex = new Shared<int>(-1);
238                 }
239
240                 // Now either enter (or continue) the yielding phase. As soon as we reach this, we know the
241                 // index of the 'count'-th input element.
242                 if (m_take)
243                 {
244                     // In the case of a Take, we will yield each element from our buffer for which
245                     // the element is lesser than the 'count'-th index found.
246                     if (m_count == 0 || m_bufferIndex.Value >= m_buffer.Count - 1)
247                     {
248                         return false;
249                     }
250
251                     // Increment the index, and remember the values.
252                     ++m_bufferIndex.Value;
253                     currentElement = m_buffer[m_bufferIndex.Value].First;
254                     currentKey = m_buffer[m_bufferIndex.Value].Second;
255
256                     // Only yield the element if its index is less than or equal to the max index.
257                     return m_sharedIndices.Count == 0 
258                         || m_keyComparer.Compare(m_buffer[m_bufferIndex.Value].Second, m_sharedIndices.MaxValue) <= 0;
259                 }
260                 else
261                 {
262                     TKey minKey = default(TKey);
263
264                     // If the count to skip was greater than 0, look at the buffer.
265                     if (m_count > 0)
266                     {
267                         // If there wasn't enough input to skip, return right away.
268                         if (m_sharedIndices.Count < m_count)
269                         {
270                             return false;
271                         }
272
273                         minKey = m_sharedIndices.MaxValue;
274
275                         // In the case of a skip, we must skip over elements whose index is lesser than the
276                         // 'count'-th index found. Once we've exhausted the buffer, we must go back and continue
277                         // enumerating the data source until it is empty.
278                         if (m_bufferIndex.Value < m_buffer.Count - 1)
279                         {
280                             for (m_bufferIndex.Value++; m_bufferIndex.Value < m_buffer.Count; m_bufferIndex.Value++)
281                             {
282                                 // If the current buffered element's index is greater than the 'count'-th index,
283                                 // we will yield it as a result.
284                                 if (m_keyComparer.Compare(m_buffer[m_bufferIndex.Value].Second, minKey) > 0)
285                                 {
286                                     currentElement = m_buffer[m_bufferIndex.Value].First;
287                                     currentKey = m_buffer[m_bufferIndex.Value].Second;
288                                     return true;
289                                 }
290                             }
291                         }                    
292                     }
293
294                     // Lastly, so long as our input still has elements, they will be yieldable.
295                     if (m_source.MoveNext(ref currentElement, ref currentKey))
296                     {
297                         Contract.Assert(m_count <= 0 || m_keyComparer.Compare(currentKey, minKey) > 0,
298                                         "expected remaining element indices to be greater than smallest");
299                         return true;
300                     }
301                 }
302
303                 return false;
304             }
305
306             protected override void Dispose(bool disposing)
307             {
308                 m_source.Dispose();
309             }
310         }
311
312         //---------------------------------------------------------------------------------------
313         // Returns an enumerable that represents the query executing sequentially.
314         //
315
316         internal override IEnumerable<TResult> AsSequentialQuery(CancellationToken token)
317         {
318             if (m_take)
319             {
320                 return Child.AsSequentialQuery(token).Take(m_count);
321             }
322
323             IEnumerable<TResult> wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token);
324             return wrappedChild.Skip(m_count);
325         }
326
327         //-----------------------------------------------------------------------------------
328         // Query results for a Take or a Skip operator. The results are indexible if the child
329         // results were indexible.
330         //
331
332         class TakeOrSkipQueryOperatorResults : UnaryQueryOperatorResults
333         {
334             TakeOrSkipQueryOperator<TResult> m_takeOrSkipOp; // The operator that generated the results
335             int m_childCount; // The number of elements in child results
336
337             public static QueryResults<TResult> NewResults(
338                 QueryResults<TResult> childQueryResults, TakeOrSkipQueryOperator<TResult> op,
339                 QuerySettings settings, bool preferStriping)
340             {
341                 if (childQueryResults.IsIndexible)
342                 {
343                     return new TakeOrSkipQueryOperatorResults(
344                         childQueryResults, op, settings, preferStriping);
345                 }
346                 else
347                 {
348                     return new UnaryQueryOperatorResults(
349                         childQueryResults, op, settings, preferStriping);
350                 }
351             }
352
353             private TakeOrSkipQueryOperatorResults(
354                 QueryResults<TResult> childQueryResults, TakeOrSkipQueryOperator<TResult> takeOrSkipOp,
355                 QuerySettings settings, bool preferStriping)
356                 : base(childQueryResults, takeOrSkipOp, settings, preferStriping)
357             {
358                 m_takeOrSkipOp = takeOrSkipOp;
359                 Contract.Assert(m_childQueryResults.IsIndexible);
360
361                 m_childCount = m_childQueryResults.ElementsCount;
362             }
363
364             internal override bool IsIndexible
365             {
366                 get { return m_childCount >= 0; }
367             }
368
369             internal override int ElementsCount
370             {
371                 get
372                 {
373                     Contract.Assert(m_childCount >= 0);
374                     if (m_takeOrSkipOp.m_take)
375                     {
376                         return Math.Min(m_childCount, m_takeOrSkipOp.m_count);
377                     }
378                     else
379                     {
380                         return Math.Max(m_childCount - m_takeOrSkipOp.m_count, 0);                        
381                     }
382                 }
383             }
384
385             internal override TResult GetElement(int index)
386             {
387                 Contract.Assert(m_childCount >= 0);
388                 Contract.Assert(index >= 0);
389                 Contract.Assert(index < ElementsCount);
390
391                 if (m_takeOrSkipOp.m_take)
392                 {
393                     return m_childQueryResults.GetElement(index);
394                 }
395                 else
396                 {
397                     return m_childQueryResults.GetElement(m_takeOrSkipOp.m_count + index);
398                 }
399             }
400         }
401     }
402 }