3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
10 // <OWNER>[....]</OWNER>
12 // Support for sorting.
14 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
16 using System.Collections.Generic;
17 using System.Threading;
18 using System.Diagnostics.Contracts;
20 namespace System.Linq.Parallel
23 //---------------------------------------------------------------------------------------
24 // The sort helper abstraction hides the implementation of our parallel merge sort. See
25 // comments below for more details. In summary, there will be one sort helper per
26 // partition. Each will, in parallel read the whole key/value set from its input,
27 // perform a local sort on this data, and then cooperatively merge with other concurrent
28 // tasks to generate a single sorted output. The local sort step is done using a simple
29 // quick-sort algorithm. Then we use a log(p) reduction to perform merges in parallel;
30 // during each round of merges, half of the threads will stop doing work and may return.
31 // At the end, one thread will remain and it holds the final sorted output.
34 internal abstract class SortHelper<TInputOutput>
36 internal abstract TInputOutput[] Sort();
39 internal class SortHelper<TInputOutput, TKey> : SortHelper<TInputOutput>, IDisposable
42 private QueryOperatorEnumerator<TInputOutput, TKey> m_source; // The data source from which to pull data.
43 private int m_partitionCount; // The partition count.
44 private int m_partitionIndex; // This helper's index.
46 // This data is shared among all partitions.
47 private QueryTaskGroupState m_groupState; // To communicate status, e.g. cancellation.
48 private int[][] m_sharedIndices; // Shared set of indices used during sorting.
49 private GrowingArray<TKey>[] m_sharedKeys; // Shared keys with which to compare elements.
50 private TInputOutput[][] m_sharedValues; // The actual values used for comparisons.
51 private Barrier[,] m_sharedBarriers; // A matrix of barriers used for synchronizing during merges.
52 private OrdinalIndexState m_indexState; // State of the order index
53 private IComparer<TKey> m_keyComparer; // Comparer for the order keys
55 //---------------------------------------------------------------------------------------
56 // Creates a single sort helper object. This is marked private to ensure the only
57 // snippet of code that creates one is the factory, since creating many implies some
58 // implementation detail in terms of dependencies which other places in the codebase
59 // shouldn't need to worry about.
62 private SortHelper(QueryOperatorEnumerator<TInputOutput, TKey> source, int partitionCount, int partitionIndex,
63 QueryTaskGroupState groupState, int[][] sharedIndices,
64 OrdinalIndexState indexState, IComparer<TKey> keyComparer,
65 GrowingArray<TKey>[] sharedkeys, TInputOutput[][] sharedValues, Barrier[,] sharedBarriers)
67 Contract.Assert(source != null);
68 Contract.Assert(groupState != null);
69 Contract.Assert(sharedIndices != null);
70 Contract.Assert(sharedkeys != null);
71 Contract.Assert(sharedValues != null);
72 Contract.Assert(sharedBarriers != null);
74 Contract.Assert(groupState.CancellationState.MergedCancellationToken != null);
76 Contract.Assert(sharedIndices.Length <= sharedkeys.Length);
77 Contract.Assert(sharedIndices.Length == sharedValues.Length);
78 Contract.Assert(sharedIndices.Length == sharedBarriers.GetLength(1));
80 Contract.Assert(groupState.CancellationState.MergedCancellationToken != null);
84 m_partitionCount = partitionCount;
85 m_partitionIndex = partitionIndex;
86 m_groupState = groupState;
87 m_sharedIndices = sharedIndices;
88 m_indexState = indexState;
89 m_keyComparer = keyComparer;
90 m_sharedKeys = sharedkeys;
91 m_sharedValues = sharedValues;
92 m_sharedBarriers = sharedBarriers;
94 Contract.Assert(m_sharedKeys.Length >= m_sharedValues.Length);
97 //---------------------------------------------------------------------------------------
98 // Factory method to create a bunch of sort helpers that are all related. Once created,
99 // these helpers must all run concurrently with one another.
102 // partitions - the input data partitions to be sorted
103 // groupState - common state used for communication (e.g. cancellation)
106 // An array of helpers, one for each partition.
109 internal static SortHelper<TInputOutput, TKey>[] GenerateSortHelpers(
110 PartitionedStream<TInputOutput, TKey> partitions, QueryTaskGroupState groupState)
112 int degreeOfParallelism = partitions.PartitionCount;
113 SortHelper<TInputOutput, TKey>[] helpers = new SortHelper<TInputOutput, TKey>[degreeOfParallelism];
115 // Calculate the next highest power of two greater than or equal to the DOP.
116 // Also, calculate phaseCount = log2(degreeOfParallelismPow2)
117 int degreeOfParallelismPow2 = 1, phaseCount = 0;
118 while (degreeOfParallelismPow2 < degreeOfParallelism)
121 degreeOfParallelismPow2 <<= 1;
124 // Initialize shared objects used during sorting.
125 int[][] sharedIndices = new int[degreeOfParallelism][];
126 GrowingArray<TKey>[] sharedKeys = new GrowingArray<TKey>[degreeOfParallelism];
127 TInputOutput[][] sharedValues = new TInputOutput[degreeOfParallelism][];
128 Barrier[,] sharedBarriers = new Barrier[phaseCount, degreeOfParallelism];
130 if (degreeOfParallelism > 1)
132 // Initialize the barriers we need. Due to the logarithmic reduction, we don't
133 // need to populate the whole matrix.
135 for (int i = 0; i < sharedBarriers.GetLength(0); i++)
137 for (int j = 0; j < sharedBarriers.GetLength(1); j++)
139 // As the phases increase, the barriers required become more and more sparse.
140 if ((j % offset) == 0)
142 sharedBarriers[i, j] = new Barrier(2);
149 // Lastly populate the array of sort helpers.
150 for (int i = 0; i < degreeOfParallelism; i++)
152 helpers[i] = new SortHelper<TInputOutput, TKey>(
153 partitions[i], degreeOfParallelism, i,
154 groupState, sharedIndices,
155 partitions.OrdinalIndexState, partitions.KeyComparer,
156 sharedKeys, sharedValues, sharedBarriers);
162 //---------------------------------------------------------------------------------------
163 // Disposes of this sort helper's expensive state.
166 public void Dispose()
168 // We only dispose of the barriers when the 1st partition finishes. That's because
169 // all others depend on the shared barriers, so we can't get rid of them eagerly.
170 if (m_partitionIndex == 0)
172 for (int i = 0; i < m_sharedBarriers.GetLength(0); i++)
174 for (int j = 0; j < m_sharedBarriers.GetLength(1); j++)
176 Barrier b = m_sharedBarriers[i, j];
186 //---------------------------------------------------------------------------------------
187 // Sorts the data, possibly returning a result.
190 // This method makes some pretty fundamental assumptions about what concurrency
191 // exists in the system. Namely, it assumes all SortHelpers are running in
192 // parallel. If they aren't Sort will end up waiting for certain events that
193 // will never happen -- i.e. we will deadlock.
196 internal override TInputOutput[] Sort()
198 // Step 1. Accumulate this partitions' worth of input.
199 GrowingArray<TKey> sourceKeys = null;
200 List<TInputOutput> sourceValues = null;
202 BuildKeysFromSource(ref sourceKeys, ref sourceValues);
204 Contract.Assert(sourceValues != null, "values weren't populated");
205 Contract.Assert(sourceKeys != null, "keys weren't populated");
207 // Step 2. Locally sort this partition's key indices in-place.
208 QuickSortIndicesInPlace(sourceKeys, sourceValues, m_indexState);
210 // Step 3. Enter into the merging phases, each separated by several barriers.
211 if (m_partitionCount > 1)
213 // We only need to merge if there is more than 1 partition.
214 MergeSortCooperatively();
217 return m_sharedValues[m_partitionIndex];
220 //-----------------------------------------------------------------------------------
221 // Generates a list of values and keys from the data source. After calling this,
222 // the keys and values lists will be populated; each key at index i corresponds to
223 // the value at index i in the other list.
226 // Should only be called once per sort helper.
229 private void BuildKeysFromSource(ref GrowingArray<TKey> keys, ref List<TInputOutput> values)
231 values = new List<TInputOutput>();
233 // Enumerate the whole input set, generating a key set in the process.
234 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
237 TInputOutput current = default(TInputOutput);
238 TKey currentKey = default(TKey);
239 bool hadNext = m_source.MoveNext(ref current, ref currentKey);
243 keys = new GrowingArray<TKey>();
251 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
252 CancellationState.ThrowIfCanceled(cancelToken);
254 // Accumulate the keys and values so that we can sort them in a moment.
255 keys.Add(currentKey);
258 while (m_source.MoveNext(ref current, ref currentKey));
267 //-----------------------------------------------------------------------------------
268 // Produces a list of indices and sorts them in place using a local sort.
271 // Each element in the indices array is an index which refers to an element in
272 // the key/value array. After calling this routine, the indices will be ordered
273 // such that the keys they refere to are in ascending or descending order,
274 // according to the sort criteria used.
277 private void QuickSortIndicesInPlace(GrowingArray<TKey> keys, List<TInputOutput> values, OrdinalIndexState ordinalIndexState)
279 Contract.Assert(keys != null);
280 Contract.Assert(values != null);
281 Contract.Assert(keys.Count == values.Count);
283 // Generate a list of keys in forward order. We will sort them in a moment.
284 int[] indices = new int[values.Count];
285 for (int i = 0; i < indices.Length; i++)
290 // Now sort the indices in place.
291 if (indices.Length > 1
292 && ordinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
294 QuickSort(0, indices.Length - 1, keys.InternalArray, indices, m_groupState.CancellationState.MergedCancellationToken);
297 if (m_partitionCount == 1)
299 // If there is only one partition, we will produce the final value set now,
300 // since there will be no merge afterward (which is where we usually do this).
301 TInputOutput[] sortedValues = new TInputOutput[values.Count];
302 for (int i = 0; i < indices.Length; i++)
304 sortedValues[i] = values[indices[i]];
306 m_sharedValues[m_partitionIndex] = sortedValues;
310 // Otherwise, a merge will happen. Generate the shared data structures.
311 m_sharedIndices[m_partitionIndex] = indices;
312 m_sharedKeys[m_partitionIndex] = keys;
313 m_sharedValues[m_partitionIndex] = new TInputOutput[values.Count];
315 // Copy local structures to shared space.
316 values.CopyTo(m_sharedValues[m_partitionIndex]);
320 //-----------------------------------------------------------------------------------
321 // Works cooperatively with other concurrent sort helpers to produce a final sorted
322 // output list of data. Here is an overview of the algorithm used.
324 // During each phase, we must communicate with a partner task. As a simple
325 // illustration, imagine we have 8 partitions (P=8), numbered 0-7. There will be
326 // Log2(O)+2 phases (separated by barriers), where O is the next power of two greater
327 // than or equal to P, in the sort operation:
330 // phase=L: [0][1] [2][3] [4][5] [6][7]
331 // phase=0: [0,1] [2,3] [4,5] [6,7]
332 // phase=1: [0,2] [4,6]
336 // During phase L, each partition locally sorts its data. Then, at each subsequent
337 // phase in the logarithmic reduction, two partitions are paired together and cooperate
338 // to accomplish a portion of the merge. The left one then goes on to choose another
339 // partner, in the next phase, and the right one exits. And so on, until phase M, when
340 // there is just one partition left (the 0th), which is when it may publish the final
341 // output from the sort operation.
343 // Notice we mentioned rounding up to the next power of two when determining the number
344 // of phases. Values of P which aren't powers of 2 are slightly problematic, because
345 // they create a load imbalance in one of the partitions and heighten the depth of the
346 // logarithmic tree. As an illustration, imagine this case:
349 // phase=L: [0][1] [2][3] [4]
350 // phase=0: [0,1] [2,3] [4,X] [X,X]
351 // phase=1: [0,2] [4,X]
355 // Partition #4 in this example performs its local sort during phase L, but then has nothing
356 // to do during phases 0 and 2. (I.e. it has nobody to merge with.) Only during phase 2
357 // does it then resume work and help phase 2 perform its merge. This is modeled a bit like
358 // there were actually 8 partitions, which is the next power of two greater than or equal to
359 // 5. This example was chosen as an extreme case of imbalance. We stall a processor (the 5th)
360 // for two complete phases. If P = 6 or 7, the problem would not be nearly so bad, but if
361 // P = 9, the last partition would stall for yet another phase (and so on for every power of
362 // two boundary). We handle these, cases, but note that an overabundance of them will probably
363 // negatively impact speedups.
366 private void MergeSortCooperatively()
368 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
370 int phaseCount = m_sharedBarriers.GetLength(0);
371 for (int phase = 0; phase < phaseCount; phase++)
373 bool isLastPhase = (phase == (phaseCount - 1));
375 // Calculate our partner for this phase and the next.
376 int partnerIndex = ComputePartnerIndex(phase);
378 // If we have a partner (see above for non power of 2 cases and why the index returned might
379 // be out of bounds), we will coordinate with the partner to produce the merged output.
380 if (partnerIndex < m_partitionCount)
382 // Cache references to our local data.
383 int[] myIndices = m_sharedIndices[m_partitionIndex];
384 GrowingArray<TKey> myKeys = m_sharedKeys[m_partitionIndex];
385 TKey[] myKeysArr = myKeys.InternalArray;
387 TInputOutput[] myValues = m_sharedValues[m_partitionIndex];
390 // First we must rendezvous with our merge partner so we know the previous sort
391 // and merge phase has been completed. By convention, we always use the left-most
392 // partner's barrier for this; all that matters is that both uses the same.
393 m_sharedBarriers[phase, Math.Min(m_partitionIndex, partnerIndex)].SignalAndWait(cancelToken);
395 // Grab the two sorted inputs and then merge them cooperatively into one list. One
396 // worker merges from left-to-right until it's placed elements up to the half-way
397 // point, and the other worker does the same, but only from right-to-left.
398 if (m_partitionIndex < partnerIndex)
400 // Before moving on to the actual merge, the left-most partition will allocate data
401 // to hold the merged indices and key/value pairs.
403 // First, remember a copy of all of the partner's lists.
404 int[] rightIndices = m_sharedIndices[partnerIndex];
405 TKey[] rightKeys = m_sharedKeys[partnerIndex].InternalArray;
406 TInputOutput[] rightValues = m_sharedValues[partnerIndex];
408 // We copy the our own items into the right's (overwriting its values) so that it can
409 // retrieve them after the barrier. This is an exchange operation.
410 m_sharedIndices[partnerIndex] = myIndices;
411 m_sharedKeys[partnerIndex] = myKeys;
412 m_sharedValues[partnerIndex] = myValues;
414 int leftCount = myValues.Length;
415 int rightCount = rightValues.Length;
416 int totalCount = leftCount + rightCount;
418 // Now allocate the lists into which the merged data will go. Share this
419 // with the other thread so that it can place data into it as well.
420 int[] mergedIndices = null;
421 TInputOutput[] mergedValues = new TInputOutput[totalCount];
423 // Only on the last phase do we need to remember indices and keys.
426 mergedIndices = new int[totalCount];
429 // Publish our newly allocated merged data structures.
430 m_sharedIndices[m_partitionIndex] = mergedIndices;
431 m_sharedKeys[m_partitionIndex] = myKeys;
432 m_sharedValues[m_partitionIndex] = mergedValues;
434 Contract.Assert(myKeysArr != null);
436 m_sharedBarriers[phase, m_partitionIndex].SignalAndWait(cancelToken);
438 // Merge the left half into the shared merged space. This is a normal merge sort with
439 // the caveat that we stop merging once we reach the half-way point (since our partner
440 // is doing the same for the right half). Note that during the last phase we only
441 // copy the values and not the indices or keys.
442 int m = (totalCount + 1)/2;
443 int i = 0, j0 = 0, j1 = 0;
446 if ((i & CancellationState.POLL_INTERVAL) == 0)
447 CancellationState.ThrowIfCanceled(cancelToken);
449 if (j0 < leftCount && (j1 >= rightCount ||
450 m_keyComparer.Compare(myKeysArr[myIndices[j0]],
451 rightKeys[rightIndices[j1]]) <= 0))
455 mergedValues[i] = myValues[myIndices[j0]];
459 mergedIndices[i] = myIndices[j0];
467 mergedValues[i] = rightValues[rightIndices[j1]];
471 mergedIndices[i] = leftCount + rightIndices[j1];
478 // If it's not the last phase, we just bulk propagate the keys and values.
479 if (!isLastPhase && leftCount > 0)
481 Array.Copy(myValues, 0, mergedValues, 0, leftCount);
484 // And now just wait for the second half. We never reuse the same barrier across multiple
485 // phases, so we can always dispose of it when we wake up.
486 m_sharedBarriers[phase, m_partitionIndex].SignalAndWait(cancelToken);
490 // Wait for the other partition to allocate the shared data.
491 m_sharedBarriers[phase, partnerIndex].SignalAndWait(cancelToken);
493 // After the barrier, the other partition will have made two things available to us:
494 // (1) its own indices, keys, and values, stored in the cell that used to hold our data,
495 // and (2) the arrays into which merged data will go, stored in its shared array cells.
496 // We will snag references to all of these things.
497 int[] leftIndices = m_sharedIndices[m_partitionIndex];
498 TKey[] leftKeys = m_sharedKeys[m_partitionIndex].InternalArray;
499 TInputOutput[] leftValues = m_sharedValues[m_partitionIndex];
500 int[] mergedIndices = m_sharedIndices[partnerIndex];
501 GrowingArray<TKey> mergedKeys = m_sharedKeys[partnerIndex];
502 TInputOutput[] mergedValues = m_sharedValues[partnerIndex];
504 Contract.Assert(leftValues != null);
505 Contract.Assert(leftKeys != null);
507 int leftCount = leftValues.Length;
508 int rightCount = myValues.Length;
509 int totalCount = leftCount + rightCount;
511 // Merge the right half into the shared merged space. This is a normal merge sort with
512 // the caveat that we stop merging once we reach the half-way point (since our partner
513 // is doing the same for the left half). Note that during the last phase we only
514 // copy the values and not the indices or keys.
515 int m = (totalCount + 1)/2;
516 int i = totalCount - 1, j0 = leftCount - 1, j1 = rightCount - 1;
519 if ((i & CancellationState.POLL_INTERVAL) == 0)
520 CancellationState.ThrowIfCanceled(cancelToken);
522 if (j0 >= 0 && (j1 < 0 ||
523 m_keyComparer.Compare(leftKeys[leftIndices[j0]],
524 myKeysArr[myIndices[j1]]) > 0))
528 mergedValues[i] = leftValues[leftIndices[j0]];
532 mergedIndices[i] = leftIndices[j0];
540 mergedValues[i] = myValues[myIndices[j1]];
544 mergedIndices[i] = leftCount + myIndices[j1];
551 // If it's not the last phase, we just bulk propagate the keys and values.
552 if (!isLastPhase && myValues.Length > 0)
554 mergedKeys.CopyFrom(myKeysArr, myValues.Length);
555 Array.Copy(myValues, 0, mergedValues, leftCount, myValues.Length);
558 // Wait for our partner to finish copying too.
559 m_sharedBarriers[phase, partnerIndex].SignalAndWait(cancelToken);
561 // Now the greater of the two partners can leave, it's done.
568 //---------------------------------------------------------------------------------------
569 // Computes our partner index given the logarithmic reduction algorithm specified above.
572 private int ComputePartnerIndex(int phase)
574 int offset = 1 << phase;
575 return m_partitionIndex + ((m_partitionIndex % (offset * 2)) == 0 ? offset : -offset);
578 //---------------------------------------------------------------------------------------
579 // Sort algorithm used to sort key/value lists. After this has been called, the indices
580 // will have been placed in sorted order based on the keys provided.
583 private void QuickSort(int left, int right, TKey[] keys, int[] indices, CancellationToken cancelToken)
585 Contract.Assert(keys != null, "need a non-null keyset");
586 Contract.Assert(keys.Length >= indices.Length);
587 Contract.Assert(left <= right);
588 Contract.Assert(0 <= left && left < keys.Length);
589 Contract.Assert(0 <= right && right < keys.Length);
591 // cancellation check.
592 // only test for intervals that are wider than so many items, else this test is
593 // relatively expensive compared to the work being performend.
594 if (right - left > CancellationState.POLL_INTERVAL)
595 CancellationState.ThrowIfCanceled(cancelToken);
601 int pivot = indices[i + ((j - i) >> 1)];
602 TKey pivotKey = keys[pivot];
606 while (m_keyComparer.Compare(keys[indices[i]], pivotKey) < 0) i++;
607 while (m_keyComparer.Compare(keys[indices[j]], pivotKey) > 0) j--;
609 Contract.Assert(i >= left && j <= right, "(i>=left && j<=right) sort failed - bogus IComparer?");
619 int tmp = indices[i];
620 indices[i] = indices[j];
629 if (j - left <= right - i)
633 QuickSort(left, j, keys, indices, cancelToken);
641 QuickSort(i, right, keys, indices, cancelToken);
646 while (left < right);