3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // SingleQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
21 /// Single searches the input to find the sole element that satisfies the (optional)
22 /// predicate. If multiple such elements are found, the caller is responsible for
23 /// producing an error. There is some degree of cross-partition synchronization to
24 /// proactively hault the search if we ever determine there are multiple elements
25 /// satisfying the search in the input.
27 /// <typeparam name="TSource"></typeparam>
28 internal sealed class SingleQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
31 private readonly Func<TSource, bool> m_predicate; // The optional predicate used during the search.
33 //---------------------------------------------------------------------------------------
34 // Initializes a new Single operator.
37 // child - the child whose data we will reverse
40 internal SingleQueryOperator(IEnumerable<TSource> child, Func<TSource, bool> predicate)
43 Contract.Assert(child != null, "child data source cannot be null");
44 m_predicate = predicate;
47 //---------------------------------------------------------------------------------------
48 // Just opens the current operator, including opening the child and wrapping it with
49 // partitions as needed.
52 internal override QueryResults<TSource> Open(
53 QuerySettings settings, bool preferStriping)
55 QueryResults<TSource> childQueryResults = Child.Open(settings, false);
56 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
59 internal override void WrapPartitionedStream<TKey>(
60 PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
62 int partitionCount = inputStream.PartitionCount;
63 PartitionedStream<TSource, int> outputStream = new PartitionedStream<TSource, int>(
64 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
66 Shared<int> totalElementCount = new Shared<int>(0);
67 for (int i = 0; i < partitionCount; i++)
69 outputStream[i] = new SingleQueryOperatorEnumerator<TKey>(inputStream[i], m_predicate, totalElementCount);
72 recipient.Receive(outputStream);
75 //---------------------------------------------------------------------------------------
76 // Returns an enumerable that represents the query executing sequentially.
79 internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
81 Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
82 throw new NotSupportedException();
85 //---------------------------------------------------------------------------------------
86 // Whether this operator performs a premature merge that would not be performed in
87 // a similar sequential operation (i.e., in LINQ to Objects).
90 internal override bool LimitsParallelism
95 //---------------------------------------------------------------------------------------
96 // The enumerator type responsible for executing the Single operation.
99 class SingleQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
102 private QueryOperatorEnumerator<TSource, TKey> m_source; // The data source to enumerate.
103 private Func<TSource, bool> m_predicate; // The optional predicate used during the search.
104 private bool m_alreadySearched; // Whether we have searched our input already.
105 private bool m_yieldExtra; // Whether we found more than one element.
107 // Data shared among partitions.
108 private Shared<int> m_totalElementCount; // The total count of elements found.
110 //---------------------------------------------------------------------------------------
111 // Instantiates a new enumerator.
114 internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator<TSource, TKey> source,
115 Func<TSource, bool> predicate, Shared<int> totalElementCount)
117 Contract.Assert(source != null);
118 Contract.Assert(totalElementCount != null);
121 m_predicate = predicate;
122 m_totalElementCount = totalElementCount;
125 //---------------------------------------------------------------------------------------
126 // Straightforward IEnumerator<T> methods.
129 internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
131 Contract.Assert(m_source != null);
133 if (m_alreadySearched)
135 // If we've already searched, we will "fake out" the caller by returning an extra
136 // element at the end in the case that we've found more than one element.
139 m_yieldExtra = false;
140 currentElement = default(TSource);
148 // Scan our input, looking for a match.
150 TSource current = default(TSource);
151 TKey keyUnused = default(TKey);
153 while (m_source.MoveNext(ref current, ref keyUnused))
155 // If the predicate is null or the current element satisfies it, we will remember
156 // it so that we can yield it later. We then proceed with scanning the input
157 // in case there are multiple such elements.
158 if (m_predicate == null || m_predicate(current))
160 // Notify other partitions.
161 Interlocked.Increment(ref m_totalElementCount.Value);
163 currentElement = current;
168 // Already found an element previously, we can exit.
178 // If we've already determined there is more than one matching element in the
179 // data source, we can exit right away.
180 if (Volatile.Read(ref m_totalElementCount.Value) > 1)
185 m_alreadySearched = true;
190 protected override void Dispose(bool disposing)