Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / LastQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // LastQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Last tries to discover the last element in the source, optionally matching a
22     /// predicate.  All partitions search in parallel, publish the greatest index for a
23     /// candidate match, and reach a barrier.  Only the partition that "wins" the ----,
24     /// i.e. who found the candidate with the largest index, will yield an element.
25     ///
26     /// </summary>
27     /// <typeparam name="TSource"></typeparam>
28     internal sealed class LastQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
29     {
30
31         private readonly Func<TSource, bool> m_predicate; // The optional predicate used during the search.
32         private readonly bool m_prematureMergeNeeded; // Whether to prematurely merge the input of this operator.
33
34         //---------------------------------------------------------------------------------------
35         // Initializes a new last operator.
36         //
37         // Arguments:
38         //     child                - the child whose data we will reverse
39         //
40
41         internal LastQueryOperator(IEnumerable<TSource> child, Func<TSource, bool> predicate)
42             :base(child)
43         {
44             Contract.Assert(child != null, "child data source cannot be null");
45             m_predicate = predicate;
46             m_prematureMergeNeeded = Child.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
47         }
48
49         //---------------------------------------------------------------------------------------
50         // Just opens the current operator, including opening the child and wrapping it with
51         // partitions as needed.
52         //
53
54         internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
55         {
56             QueryResults<TSource> childQueryResults = Child.Open(settings, false);
57             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
58         }
59
60         internal override void  WrapPartitionedStream<TKey>(
61             PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
62         {
63             // If the index is not at least increasing, we need to reindex.
64             if (m_prematureMergeNeeded)
65             {
66                 PartitionedStream<TSource, int> intKeyStream =
67                     ExecuteAndCollectResults(inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings).GetPartitionedStream();
68                 WrapHelper<int>(intKeyStream, recipient, settings);
69             }
70             else
71             {
72                 WrapHelper<TKey>(inputStream, recipient, settings);
73             }
74         }
75
76         private void WrapHelper<TKey>(PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
77         {
78             int partitionCount = inputStream.PartitionCount;
79
80             // Generate the shared data.
81             LastQueryOperatorState<TKey> operatorState = new LastQueryOperatorState<TKey>();
82             CountdownEvent sharedBarrier = new CountdownEvent(partitionCount);
83
84             PartitionedStream<TSource, int> outputStream =
85                 new PartitionedStream<TSource, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
86             for (int i = 0; i < partitionCount; i++)
87             {
88                 outputStream[i] = new LastQueryOperatorEnumerator<TKey>(
89                     inputStream[i], m_predicate, operatorState, sharedBarrier, settings.CancellationState.MergedCancellationToken,
90                     inputStream.KeyComparer, i);
91             }
92             recipient.Receive(outputStream);
93         }
94
95         //---------------------------------------------------------------------------------------
96         // Returns an enumerable that represents the query executing sequentially.
97         //
98         internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
99         {
100             Contract.Assert(false, "This method should never be called as fallback to sequential is handled in ParallelEnumerable.First().");
101             throw new NotSupportedException();
102         }
103
104         //---------------------------------------------------------------------------------------
105         // Whether this operator performs a premature merge that would not be performed in
106         // a similar sequential operation (i.e., in LINQ to Objects).
107         //
108
109         internal override bool LimitsParallelism
110         {
111             get { return false; }
112         }
113
114         //---------------------------------------------------------------------------------------
115         // The enumerator type responsible for executing the last operation.
116         //
117
118         class LastQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
119         {
120
121             private QueryOperatorEnumerator<TSource, TKey> m_source; // The data source to enumerate.
122             private Func<TSource, bool> m_predicate; // The optional predicate used during the search.
123             private bool m_alreadySearched; // Set once the enumerator has performed the search.
124             private int m_partitionId; // ID of this partition
125
126             // Data shared among partitions.
127             private LastQueryOperatorState<TKey> m_operatorState; // The current last candidate and its partition id.
128             private CountdownEvent m_sharedBarrier; // Shared barrier, signaled when partitions find their 1st element.
129             private CancellationToken m_cancellationToken; // Token used to cancel this operator.
130             private IComparer<TKey> m_keyComparer; // Comparer for the order keys
131
132             //---------------------------------------------------------------------------------------
133             // Instantiates a new enumerator.
134             //
135
136             internal LastQueryOperatorEnumerator(
137                 QueryOperatorEnumerator<TSource, TKey> source, Func<TSource, bool> predicate,
138                 LastQueryOperatorState<TKey> operatorState, CountdownEvent sharedBarrier, CancellationToken cancelToken,
139                 IComparer<TKey> keyComparer, int partitionId)
140             {
141                 Contract.Assert(source != null);
142                 Contract.Assert(operatorState != null);
143                 Contract.Assert(sharedBarrier != null);
144                 Contract.Assert(keyComparer != null);
145
146                 m_source = source;
147                 m_predicate = predicate;
148                 m_operatorState = operatorState;
149                 m_sharedBarrier = sharedBarrier;
150                 m_cancellationToken = cancelToken;
151                 m_keyComparer = keyComparer;
152                 m_partitionId = partitionId;
153             }
154
155             //---------------------------------------------------------------------------------------
156             // Straightforward IEnumerator<T> methods.
157             //
158
159             internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
160             {
161                 Contract.Assert(m_source != null);
162
163                 if (m_alreadySearched)
164                 {
165                     return false;
166                 }
167
168                 // Look for the greatest element.
169                 TSource candidate = default(TSource);
170                 TKey candidateKey = default(TKey);
171                 bool candidateFound = false;
172                 try
173                 {
174                     int loopCount = 0; //counter to help with cancellation
175                     TSource value = default(TSource);
176                     TKey key = default(TKey);
177                     while (m_source.MoveNext(ref value, ref key))
178                     {
179                         if ((loopCount & CancellationState.POLL_INTERVAL) == 0)
180                             CancellationState.ThrowIfCanceled(m_cancellationToken);
181
182                         // If the predicate is null or the current element satisfies it, we will remember
183                         // it as the current partition's candidate for the last element, and move on.
184                         if (m_predicate == null || m_predicate(value))
185                         {
186                             candidate = value;
187                             candidateKey = key;
188                             candidateFound = true;
189                         }
190
191                         loopCount++;
192                     }
193
194                     // If we found a candidate element, try to publish it, so long as it's greater.
195                     if (candidateFound)
196                     {
197                         lock (m_operatorState)
198                         {
199                             if (m_operatorState.m_partitionId == -1 || m_keyComparer.Compare(candidateKey, m_operatorState.m_key) > 0)
200                             {
201                                 m_operatorState.m_partitionId = m_partitionId;
202                                 m_operatorState.m_key = candidateKey;
203                             }
204                         }
205                     }
206                 }
207                 finally
208                 {
209                     // No matter whether we exit due to an exception or normal completion, we must ensure
210                     // that we signal other partitions that we have completed.  Otherwise, we can cause deadlocks.
211                     m_sharedBarrier.Signal();
212                 }
213
214                 m_alreadySearched = true;
215
216                 // Only if we have a candidate do we wait.
217                 if (m_partitionId == m_operatorState.m_partitionId)
218                 {
219                     m_sharedBarrier.Wait(m_cancellationToken);
220
221                     // Now re-read the shared index. If it's the same as ours, we won and return true.
222                     if (m_operatorState.m_partitionId == m_partitionId)
223                     {
224                         currentElement = candidate;
225                         currentKey = 0; // 1st (and only) element, so we hardcode the output index to 0.
226                         return true;
227                     }
228                 }
229
230                 // If we got here, we didn't win. Return false.
231                 return false;
232             }
233
234             protected override void Dispose(bool disposing)
235             {
236                 m_source.Dispose();
237             }
238         }
239
240
241         class LastQueryOperatorState<TKey>
242         {
243             internal TKey m_key;
244             internal int m_partitionId = -1;
245         }
246     }
247 }