Build eglib under none desktop Windows API family.
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / ConcatQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ConcatQueryOperator.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Concatenates one data source with another.  Order preservation is used to ensure
22     /// the output is actually a concatenation -- i.e. one after the other.  The only
23     /// special synchronization required is to find the largest index N in the first data
24     /// source so that the indices of elements in the second data source can be offset
25     /// by adding N+1.  This makes it appear to the order preservation infrastructure as
26     /// though all elements in the second came after all elements in the first, which is
27     /// precisely what we want.
28     /// </summary>
29     /// <typeparam name="TSource"></typeparam>
30     internal sealed class ConcatQueryOperator<TSource> : BinaryQueryOperator<TSource, TSource, TSource>
31     {
32
33         private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
34         private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source
35
36         //---------------------------------------------------------------------------------------
37         // Initializes a new concatenation operator.
38         //
39         // Arguments:
40         //     child                - the child whose data we will reverse
41         //
42
43         internal ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild)
44             : base(firstChild, secondChild)
45         {
46             Contract.Assert(firstChild != null, "first child data source cannot be null");
47             Contract.Assert(secondChild != null, "second child data source cannot be null");
48             m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;
49
50             m_prematureMergeLeft = LeftChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
51             m_prematureMergeRight = RightChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
52
53             if ((LeftChild.OrdinalIndexState == OrdinalIndexState.Indexible)
54                 && (RightChild.OrdinalIndexState == OrdinalIndexState.Indexible))
55             {
56                 SetOrdinalIndex(OrdinalIndexState.Indexible);
57             }
58             else
59             {
60                 SetOrdinalIndex(
61                     ExchangeUtilities.Worse(OrdinalIndexState.Increasing, 
62                         ExchangeUtilities.Worse(LeftChild.OrdinalIndexState, RightChild.OrdinalIndexState)));
63             }
64         }
65
66         //---------------------------------------------------------------------------------------
67         // Just opens the current operator, including opening the child and wrapping it with
68         // partitions as needed.
69         //
70
71         internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
72         {
73             // We just open the children operators.
74             QueryResults<TSource> leftChildResults = LeftChild.Open(settings, preferStriping);
75             QueryResults<TSource> rightChildResults = RightChild.Open(settings, preferStriping);
76
77             return ConcatQueryOperatorResults.NewResults(leftChildResults, rightChildResults, this, settings, preferStriping);
78         }
79
80         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
81             PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream,
82             IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings)
83         {
84             // Prematurely merge the left results, if necessary
85             if (m_prematureMergeLeft)
86             {
87                 ListQueryResults<TSource> leftStreamResults = 
88                     ExecuteAndCollectResults(leftStream, leftStream.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
89                 PartitionedStream<TSource, int> leftStreamInc =  leftStreamResults.GetPartitionedStream();
90                 WrapHelper<int, TRightKey>(leftStreamInc, rightStream, outputRecipient, settings, preferStriping);
91             }
92             else
93             {
94                 Contract.Assert(!ExchangeUtilities.IsWorseThan(leftStream.OrdinalIndexState, OrdinalIndexState.Increasing));
95                 WrapHelper<TLeftKey, TRightKey>(leftStream, rightStream, outputRecipient, settings, preferStriping);
96             }
97         }
98
99         private void WrapHelper<TLeftKey,TRightKey>(
100             PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream, 
101             IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping)
102         {
103             // Prematurely merge the right results, if necessary
104             if (m_prematureMergeRight)
105             {
106                 ListQueryResults<TSource> rightStreamResults =
107                     ExecuteAndCollectResults(rightStream, leftStreamInc.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
108                 PartitionedStream<TSource, int> rightStreamInc = rightStreamResults.GetPartitionedStream();
109                 WrapHelper2<TLeftKey, int>(leftStreamInc, rightStreamInc, outputRecipient);
110             }
111             else
112             {
113                 Contract.Assert(!ExchangeUtilities.IsWorseThan(rightStream.OrdinalIndexState, OrdinalIndexState.Increasing));
114                 WrapHelper2<TLeftKey, TRightKey>(leftStreamInc, rightStream, outputRecipient);
115             }
116         }
117
118         private void WrapHelper2<TLeftKey, TRightKey>(
119             PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc,
120             IPartitionedStreamRecipient<TSource> outputRecipient)
121         {
122             int partitionCount = leftStreamInc.PartitionCount;
123
124             // Generate the shared data.
125             IComparer<ConcatKey<TLeftKey, TRightKey>> comparer = ConcatKey<TLeftKey, TRightKey>.MakeComparer(
126                 leftStreamInc.KeyComparer, rightStreamInc.KeyComparer);
127             var outputStream = new PartitionedStream<TSource, ConcatKey<TLeftKey, TRightKey>>(partitionCount, comparer, OrdinalIndexState);
128
129             for (int i = 0; i < partitionCount; i++)
130             {
131                 outputStream[i] = new ConcatQueryOperatorEnumerator<TLeftKey, TRightKey>(leftStreamInc[i], rightStreamInc[i]);
132             }
133
134             outputRecipient.Receive(outputStream);
135         }
136
137
138         //---------------------------------------------------------------------------------------
139         // Returns an enumerable that represents the query executing sequentially.
140         //
141
142         internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
143         {
144             return LeftChild.AsSequentialQuery(token).Concat(RightChild.AsSequentialQuery(token));
145         }
146
147
148         //---------------------------------------------------------------------------------------
149         // Whether this operator performs a premature merge that would not be performed in
150         // a similar sequential operation (i.e., in LINQ to Objects).
151         //
152
153         internal override bool LimitsParallelism
154         {
155             get { return false; }
156         }
157
158         //---------------------------------------------------------------------------------------
159         // The enumerator type responsible for concatenating two data sources.
160         //
161
162         class ConcatQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TSource, ConcatKey<TLeftKey, TRightKey>>
163         {
164
165             private QueryOperatorEnumerator<TSource, TLeftKey> m_firstSource; // The first data source to enumerate.
166             private QueryOperatorEnumerator<TSource, TRightKey> m_secondSource; // The second data source to enumerate.
167             private bool m_begunSecond; // Whether this partition has begun enumerating the second source yet.
168
169             //---------------------------------------------------------------------------------------
170             // Instantiates a new select enumerator.
171             //
172
173             internal ConcatQueryOperatorEnumerator(
174                 QueryOperatorEnumerator<TSource, TLeftKey> firstSource,
175                 QueryOperatorEnumerator<TSource, TRightKey> secondSource)
176             {
177                 Contract.Assert(firstSource != null);
178                 Contract.Assert(secondSource != null);
179
180                 m_firstSource = firstSource;
181                 m_secondSource = secondSource;
182             }
183
184             //---------------------------------------------------------------------------------------
185             // MoveNext advances to the next element in the output.  While the first data source has
186             // elements, this consists of just advancing through it.  After this, all partitions must
187             // synchronize at a barrier and publish the maximum index N.  Finally, all partitions can
188             // move on to the second data source, adding N+1 to indices in order to get the correct
189             // index offset.
190             //
191
192             internal override bool MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)
193             {
194                 Contract.Assert(m_firstSource != null);
195                 Contract.Assert(m_secondSource != null);
196
197                 // If we are still enumerating the first source, fetch the next item.
198                 if (!m_begunSecond)
199                 {
200                     // If elements remain, just return true and continue enumerating the left.
201                     TLeftKey leftKey = default(TLeftKey);
202                     if (m_firstSource.MoveNext(ref currentElement, ref leftKey))
203                     {
204                         currentKey = ConcatKey<TLeftKey, TRightKey>.MakeLeft(leftKey);
205                         return true;
206                     }
207                     m_begunSecond = true;
208                 }
209
210                 // Now either move on to, or continue, enumerating the right data source.
211                 TRightKey rightKey = default(TRightKey);
212                 if (m_secondSource.MoveNext(ref currentElement, ref rightKey))
213                 {
214                     currentKey = ConcatKey<TLeftKey, TRightKey>.MakeRight(rightKey);
215                     return true;
216                 }
217
218                 return false;
219             }
220
221             protected override void Dispose(bool disposing)
222             {
223                 m_firstSource.Dispose();
224                 m_secondSource.Dispose();
225             }
226         }
227
228
229         //-----------------------------------------------------------------------------------
230         // Query results for a Concat operator. The results are indexible if the child
231         // results were indexible.
232         //
233
234         class ConcatQueryOperatorResults : BinaryQueryOperatorResults
235         {
236             ConcatQueryOperator<TSource> m_concatOp; // Operator that generated the results
237             int m_leftChildCount; // The number of elements in the left child result set
238             int m_rightChildCount; // The number of elements in the right child result set
239
240             public static QueryResults<TSource> NewResults(
241                 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
242                 ConcatQueryOperator<TSource> op, QuerySettings settings,
243                 bool preferStriping)
244             {
245                 if (leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible)
246                 {
247                     return new ConcatQueryOperatorResults(
248                         leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
249                 }
250                 else
251                 {
252                     return new BinaryQueryOperatorResults(
253                         leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
254                 }
255             }
256
257             private ConcatQueryOperatorResults(
258                 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
259                 ConcatQueryOperator<TSource> concatOp, QuerySettings settings,
260                 bool preferStriping)
261                 : base(leftChildQueryResults, rightChildQueryResults, concatOp, settings, preferStriping)
262             {
263                 m_concatOp = concatOp;
264                 Contract.Assert(leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible);
265
266                 m_leftChildCount = leftChildQueryResults.ElementsCount;
267                 m_rightChildCount = rightChildQueryResults.ElementsCount;
268             }
269
270             internal override bool IsIndexible
271             {
272                 get { return true; }
273             }
274
275             internal override int ElementsCount
276             {
277                 get
278                 {
279                     Contract.Assert(m_leftChildCount >= 0 && m_rightChildCount >= 0);
280                     return m_leftChildCount + m_rightChildCount;
281                 }
282             }
283
284             internal override TSource GetElement(int index)
285             {
286                 if (index < m_leftChildCount)
287                 {
288                     return m_leftChildQueryResults.GetElement(index);
289                 }
290                 else
291                 {
292                     return m_rightChildQueryResults.GetElement(index - m_leftChildCount);
293                 }
294             }
295         }
296
297     }
298
299     //---------------------------------------------------------------------------------------
300     // ConcatKey represents an ordering key for the Concat operator. It knows whether the
301     // element it is associated with is from the left source or the right source, and also
302     // the elements ordering key.
303     //
304
305     internal struct ConcatKey<TLeftKey, TRightKey>
306     {
307         private readonly TLeftKey m_leftKey;
308         private readonly TRightKey m_rightKey;
309         private readonly bool m_isLeft;
310
311         private ConcatKey(TLeftKey leftKey, TRightKey rightKey, bool isLeft)
312         {
313             m_leftKey = leftKey;
314             m_rightKey = rightKey;
315             m_isLeft = isLeft;
316         }
317
318         internal static ConcatKey<TLeftKey, TRightKey> MakeLeft(TLeftKey leftKey)
319         {
320             return new ConcatKey<TLeftKey, TRightKey>(leftKey, default(TRightKey), true);
321         }
322
323         internal static ConcatKey<TLeftKey, TRightKey> MakeRight(TRightKey rightKey)
324         {
325             return new ConcatKey<TLeftKey, TRightKey>(default(TLeftKey), rightKey, false);
326         }
327
328         internal static IComparer<ConcatKey<TLeftKey, TRightKey>> MakeComparer(
329             IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
330         {
331             return new ConcatKeyComparer(leftComparer, rightComparer);
332         }
333
334         //---------------------------------------------------------------------------------------
335         // ConcatKeyComparer compares ConcatKeys, so that elements from the left source come
336         // before elements from the right source, and elements within each source are ordered
337         // according to the corresponding order key.
338         //
339
340         private class ConcatKeyComparer : IComparer<ConcatKey<TLeftKey, TRightKey>>
341         {
342             private IComparer<TLeftKey> m_leftComparer;
343             private IComparer<TRightKey> m_rightComparer;
344
345             internal ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
346             {
347                 m_leftComparer = leftComparer;
348                 m_rightComparer = rightComparer;
349             }
350
351             public int Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y)
352             {
353                 // If one element is from the left source and the other not, the element from the left source
354                 // comes earlier.
355                 if (x.m_isLeft != y.m_isLeft)
356                 {
357                     return x.m_isLeft ? -1 : 1;
358                 }
359
360                 // Elements are from the same source (left or right). Compare the corresponding keys.
361                 if (x.m_isLeft)
362                 {
363                     return m_leftComparer.Compare(x.m_leftKey, y.m_leftKey);
364                 }
365                 return m_rightComparer.Compare(x.m_rightKey, y.m_rightKey);
366             }
367         }
368     }
369 }