3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ContainsSearchOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// Contains is quite similar to the any/all operator above. Each partition searches a
22 /// subset of elements for a match, and the first one to find a match signals to the rest
23 /// of the partititons to stop searching.
25 /// <typeparam name="TInput"></typeparam>
26 internal sealed class ContainsSearchOperator<TInput> : UnaryQueryOperator<TInput, bool>
29 private readonly TInput m_searchValue; // The value for which we are searching.
30 private readonly IEqualityComparer<TInput> m_comparer; // The comparer to use for equality tests.
32 //---------------------------------------------------------------------------------------
33 // Constructs a new instance of the contains search operator.
36 // child - the child tree to enumerate.
37 // searchValue - value we are searching for.
38 // comparer - a comparison routine used to test equality.
41 internal ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer)
44 Contract.Assert(child != null, "child data source cannot be null");
46 m_searchValue = searchValue;
50 m_comparer = EqualityComparer<TInput>.Default;
54 m_comparer = comparer;
58 //---------------------------------------------------------------------------------------
59 // Executes the entire query tree, and aggregates the individual partition results to
60 // form an overall answer to the search operation.
63 internal bool Aggregate()
65 // Because the final reduction is typically much cheaper than the intermediate
66 // reductions over the individual partitions, and because each parallel partition
67 // could do a lot of work to produce a single output element, we prefer to turn off
68 // pipelining, and process the final reductions serially.
69 using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
71 // Any value of true means the element was found. We needn't consult all partitions
72 while (enumerator.MoveNext())
74 if (enumerator.Current)
84 //---------------------------------------------------------------------------------------
85 // Just opens the current operator, including opening the child and wrapping it with
86 // partitions as needed.
89 internal override QueryResults<bool> Open(QuerySettings settings, bool preferStriping)
91 QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
92 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
95 internal override void WrapPartitionedStream<TKey>(
96 PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)
99 int partitionCount = inputStream.PartitionCount;
100 PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
102 // Create a shared cancelation variable
103 Shared<bool> resultFoundFlag = new Shared<bool>(false);
104 for (int i = 0; i < partitionCount; i++)
106 outputStream[i] = new ContainsSearchOperatorEnumerator<TKey>(inputStream[i], m_searchValue, m_comparer, i, resultFoundFlag,
107 settings.CancellationState.MergedCancellationToken);
110 recipient.Receive(outputStream);
113 //---------------------------------------------------------------------------------------
114 // Returns an enumerable that represents the query executing sequentially.
117 internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token)
119 Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
120 throw new NotSupportedException();
123 //---------------------------------------------------------------------------------------
124 // Whether this operator performs a premature merge that would not be performed in
125 // a similar sequential operation (i.e., in LINQ to Objects).
128 internal override bool LimitsParallelism
130 get { return false; }
133 //---------------------------------------------------------------------------------------
134 // This enumerator performs the search over its input data source. It also cancels peer
135 // enumerators when an answer was found, and polls this cancelation flag to stop when
139 class ContainsSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int>
141 private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The source data.
142 private readonly TInput m_searchValue; // The value for which we are searching.
143 private readonly IEqualityComparer<TInput> m_comparer; // The comparer to use for equality tests.
144 private readonly int m_partitionIndex; // This partition's unique index.
145 private readonly Shared<bool> m_resultFoundFlag; // Whether to cancel the operation.
146 private CancellationToken m_cancellationToken;
148 //---------------------------------------------------------------------------------------
149 // Instantiates a new any/all search operator.
152 internal ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue,
153 IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag,
154 CancellationToken cancellationToken)
156 Contract.Assert(source != null);
157 Contract.Assert(comparer != null);
158 Contract.Assert(resultFoundFlag != null);
161 m_searchValue = searchValue;
162 m_comparer = comparer;
163 m_partitionIndex = partitionIndex;
164 m_resultFoundFlag = resultFoundFlag;
165 m_cancellationToken = cancellationToken;
168 //---------------------------------------------------------------------------------------
169 // This enumerates the entire input source to perform the search. If another peer
170 // partition finds an answer before us, we will voluntarily return (propagating the
174 internal override bool MoveNext(ref bool currentElement, ref int currentKey)
176 Contract.Assert(m_comparer != null);
178 // Avoid enumerating if we've already found an answer.
179 if (m_resultFoundFlag.Value)
182 // We just scroll through the enumerator and accumulate the result.
183 TInput element = default(TInput);
184 TKey keyUnused = default(TKey);
185 if (m_source.MoveNext(ref element, ref keyUnused))
187 currentElement = false;
188 currentKey = m_partitionIndex;
190 // Continue walking the data so long as we haven't found an item that satisfies
191 // the condition we are searching for.
195 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
196 CancellationState.ThrowIfCanceled(m_cancellationToken);
198 if (m_resultFoundFlag.Value)
200 // If cancelation occurred, it's because a successful answer was found.
204 if (m_comparer.Equals(element, m_searchValue))
206 // We have found an item that satisfies the search. Cancel other
207 // workers that are concurrently searching, and return.
208 m_resultFoundFlag.Value = true;
209 currentElement = true;
213 while (m_source.MoveNext(ref element, ref keyUnused));
221 protected override void Dispose(bool disposing)
223 Contract.Assert(m_source != null);