Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / SingleQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // SingleQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Single searches the input to find the sole element that satisfies the (optional)
22     /// predicate.  If multiple such elements are found, the caller is responsible for
23     /// producing an error.  There is some degree of cross-partition synchronization to
24     /// proactively hault the search if we ever determine there are multiple elements
25     /// satisfying the search in the input.
26     /// </summary>
27     /// <typeparam name="TSource"></typeparam>
28     internal sealed class SingleQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
29     {
30
31         private readonly Func<TSource, bool> m_predicate; // The optional predicate used during the search.
32
33         //---------------------------------------------------------------------------------------
34         // Initializes a new Single operator.
35         //
36         // Arguments:
37         //     child                - the child whose data we will reverse
38         //
39
40         internal SingleQueryOperator(IEnumerable<TSource> child, Func<TSource, bool> predicate)
41             :base(child)
42         {
43             Contract.Assert(child != null, "child data source cannot be null");
44             m_predicate = predicate;
45         }
46
47         //---------------------------------------------------------------------------------------
48         // Just opens the current operator, including opening the child and wrapping it with
49         // partitions as needed.
50         //
51
52         internal override QueryResults<TSource> Open(
53             QuerySettings settings, bool preferStriping)
54         {
55             QueryResults<TSource> childQueryResults = Child.Open(settings, false);
56             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
57         }
58
59         internal override void WrapPartitionedStream<TKey>(
60             PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
61         {
62             int partitionCount = inputStream.PartitionCount;
63             PartitionedStream<TSource, int> outputStream = new PartitionedStream<TSource, int>(
64                 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
65
66             Shared<int> totalElementCount = new Shared<int>(0);
67             for (int i = 0; i < partitionCount; i++)
68             {
69                 outputStream[i] = new SingleQueryOperatorEnumerator<TKey>(inputStream[i], m_predicate, totalElementCount);
70             }
71
72             recipient.Receive(outputStream);
73         }
74
75         //---------------------------------------------------------------------------------------
76         // Returns an enumerable that represents the query executing sequentially.
77         //
78
79         internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
80         {
81             Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
82             throw new NotSupportedException();
83         }
84
85         //---------------------------------------------------------------------------------------
86         // Whether this operator performs a premature merge that would not be performed in
87         // a similar sequential operation (i.e., in LINQ to Objects).
88         //
89
90         internal override bool LimitsParallelism
91         {
92             get { return false; }
93         }
94
95         //---------------------------------------------------------------------------------------
96         // The enumerator type responsible for executing the Single operation.
97         //
98
99         class SingleQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
100         {
101
102             private QueryOperatorEnumerator<TSource, TKey> m_source; // The data source to enumerate.
103             private Func<TSource, bool> m_predicate; // The optional predicate used during the search.
104             private bool m_alreadySearched; // Whether we have searched our input already.
105             private bool m_yieldExtra; // Whether we found more than one element.
106
107             // Data shared among partitions.
108             private Shared<int> m_totalElementCount; // The total count of elements found.
109
110             //---------------------------------------------------------------------------------------
111             // Instantiates a new enumerator.
112             //
113
114             internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator<TSource, TKey> source,
115                                                    Func<TSource, bool> predicate, Shared<int> totalElementCount)
116             {
117                 Contract.Assert(source != null);
118                 Contract.Assert(totalElementCount != null);
119
120                 m_source = source;
121                 m_predicate = predicate;
122                 m_totalElementCount = totalElementCount;
123             }
124
125             //---------------------------------------------------------------------------------------
126             // Straightforward IEnumerator<T> methods.
127             //
128
129             internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
130             {
131                 Contract.Assert(m_source != null);
132
133                 if (m_alreadySearched)
134                 {
135                     // If we've already searched, we will "fake out" the caller by returning an extra
136                     // element at the end in the case that we've found more than one element.
137                     if (m_yieldExtra)
138                     {
139                         m_yieldExtra = false;
140                         currentElement = default(TSource);
141                         currentKey = 0;
142                         return true;
143                     }
144
145                     return false;
146                 }
147
148                 // Scan our input, looking for a match.
149                 bool found = false;
150                 TSource current = default(TSource);
151                 TKey keyUnused = default(TKey);
152
153                 while (m_source.MoveNext(ref current, ref keyUnused))
154                 {
155                     // If the predicate is null or the current element satisfies it, we will remember
156                     // it so that we can yield it later.  We then proceed with scanning the input
157                     // in case there are multiple such elements.
158                     if (m_predicate == null || m_predicate(current))
159                     {
160                         // Notify other partitions.
161                         Interlocked.Increment(ref m_totalElementCount.Value);
162
163                         currentElement = current;
164                         currentKey = 0;
165
166                         if (found)
167                         {
168                             // Already found an element previously, we can exit.
169                             m_yieldExtra = true;
170                             break;
171                         }
172                         else
173                         {
174                             found = true;
175                         }
176                     }
177
178                     // If we've already determined there is more than one matching element in the
179                     // data source, we can exit right away.
180                     if (Volatile.Read(ref m_totalElementCount.Value) > 1)
181                     {
182                         break;
183                     }
184                 }
185                 m_alreadySearched = true;
186
187                 return found;
188             }
189
190             protected override void Dispose(bool disposing)
191             {
192                 m_source.Dispose();
193             }
194         }
195     }
196 }