3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ExchangeUtilities.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 using System.Core; // for System.Core.SR
21 namespace System.Linq.Parallel
24 /// ExchangeUtilities is a static class that contains helper functions to partition and merge
27 internal static class ExchangeUtilities
29 //-----------------------------------------------------------------------------------
30 // A factory method to construct a partitioned stream over a data source.
33 // source - the data source to be partitioned
34 // partitionCount - the number of partitions desired
35 // useOrdinalOrderPreservation - whether ordinal position must be tracked
36 // useStriping - whether striped partitioning should be used instead of range partitioning
39 internal static PartitionedStream<T, int> PartitionDataSource<T>(IEnumerable<T> source, int partitionCount, bool useStriping)
41 // The partitioned stream to return.
42 PartitionedStream<T, int> returnValue;
44 IParallelPartitionable<T> sourceAsPartitionable = source as IParallelPartitionable<T>;
45 if (sourceAsPartitionable != null)
47 // The type overrides the partitioning algorithm, so we will use it instead of the default.
48 // The returned enumerator must be the same size that we requested, otherwise we throw.
49 QueryOperatorEnumerator<T, int>[] enumerators = sourceAsPartitionable.GetPartitions(partitionCount);
50 if (enumerators == null)
52 throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullReturn));
54 else if (enumerators.Length != partitionCount)
56 throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_IncorretElementCount));
59 // Now just copy the enumerators into the stream, validating that the result is non-null.
60 PartitionedStream<T, int> stream =
61 new PartitionedStream<T, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
62 for (int i = 0; i < partitionCount; i++)
64 QueryOperatorEnumerator<T, int> currentEnumerator = enumerators[i];
65 if (currentEnumerator == null)
67 throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullElement));
69 stream[i] = currentEnumerator;
76 returnValue = new PartitionedDataSource<T>(source, partitionCount, useStriping);
79 Contract.Assert(returnValue.PartitionCount == partitionCount);
84 //-----------------------------------------------------------------------------------
85 // Converts an enumerator or a partitioned stream into a hash-partitioned stream. In the resulting
86 // partitioning, all elements with the same hash code are guaranteed to be in the same partition.
89 // source - the data to be hash-partitioned. If it is a partitioned stream, it
90 // must have partitionCount partitions
91 // partitionCount - the desired number of partitions
92 // useOrdinalOrderPreservation - whether ordinal order preservation is required
93 // keySelector - function to obtain the key given an element
94 // keyComparer - equality comparer for the keys
97 internal static PartitionedStream<Pair<TElement, THashKey>, int> HashRepartition<TElement, THashKey, TIgnoreKey>(
98 PartitionedStream<TElement, TIgnoreKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
99 IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
101 TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
102 return new UnorderedHashRepartitionStream<TElement, THashKey, TIgnoreKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
105 internal static PartitionedStream<Pair<TElement, THashKey>, TOrderKey> HashRepartitionOrdered<TElement, THashKey, TOrderKey>(
106 PartitionedStream<TElement, TOrderKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
107 IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
109 TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
110 return new OrderedHashRepartitionStream<TElement, THashKey, TOrderKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
113 //---------------------------------------------------------------------------------------
114 // A helper method that given two OrdinalIndexState values return the "worse" one. For
115 // example, if state1 is valid and state2 is increasing, we will return
116 // OrdinalIndexState.Increasing.
119 internal static OrdinalIndexState Worse(this OrdinalIndexState state1, OrdinalIndexState state2)
121 return state1 > state2 ? state1 : state2;
124 internal static bool IsWorseThan(this OrdinalIndexState state1, OrdinalIndexState state2)
126 return state1 > state2;
131 /// Used during hash partitioning, when the keys being memoized are not used for anything.
133 internal struct NoKeyMemoizationRequired { }