Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Utils / ExchangeUtilities.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ExchangeUtilities.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17 #if SILVERLIGHT
18 using System.Core; // for System.Core.SR
19 #endif
20
21 namespace System.Linq.Parallel
22 {
23     /// <summary>
24     /// ExchangeUtilities is a static class that contains helper functions to partition and merge
25     /// streams. 
26     /// </summary>
27     internal static class ExchangeUtilities
28     {
29         //-----------------------------------------------------------------------------------
30         // A factory method to construct a partitioned stream over a data source.
31         //
32         // Arguments:
33         //    source                      - the data source to be partitioned
34         //    partitionCount              - the number of partitions desired
35         //    useOrdinalOrderPreservation - whether ordinal position must be tracked
36         //    useStriping                 - whether striped partitioning should be used instead of range partitioning
37         //
38
39         internal static PartitionedStream<T, int> PartitionDataSource<T>(IEnumerable<T> source, int partitionCount, bool useStriping)
40         {
41             // The partitioned stream to return.
42             PartitionedStream<T, int> returnValue;
43
44             IParallelPartitionable<T> sourceAsPartitionable = source as IParallelPartitionable<T>;
45             if (sourceAsPartitionable != null)
46             {
47                 // The type overrides the partitioning algorithm, so we will use it instead of the default.
48                 // The returned enumerator must be the same size that we requested, otherwise we throw.
49                 QueryOperatorEnumerator<T, int>[] enumerators = sourceAsPartitionable.GetPartitions(partitionCount);
50                 if (enumerators == null)
51                 {
52                     throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullReturn));
53                 }
54                 else if (enumerators.Length != partitionCount)
55                 {
56                     throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_IncorretElementCount));
57                 }
58
59                 // Now just copy the enumerators into the stream, validating that the result is non-null.
60                 PartitionedStream<T, int> stream =
61                     new PartitionedStream<T, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
62                 for (int i = 0; i < partitionCount; i++)
63                 {
64                     QueryOperatorEnumerator<T, int> currentEnumerator = enumerators[i];
65                     if (currentEnumerator == null)
66                     {
67                         throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullElement));
68                     }
69                     stream[i] = currentEnumerator;
70                 }
71
72                 returnValue = stream;
73             }
74             else
75             {
76                 returnValue = new PartitionedDataSource<T>(source, partitionCount, useStriping);
77             }
78
79             Contract.Assert(returnValue.PartitionCount == partitionCount);
80
81             return returnValue;
82         }
83
84         //-----------------------------------------------------------------------------------
85         // Converts an enumerator or a partitioned stream into a hash-partitioned stream. In the resulting
86         // partitioning, all elements with the same hash code are guaranteed to be in the same partition.
87         //
88         // Arguments:
89         //    source                      - the data to be hash-partitioned. If it is a partitioned stream, it 
90         //                                  must have partitionCount partitions 
91         //    partitionCount              - the desired number of partitions
92         //    useOrdinalOrderPreservation - whether ordinal order preservation is required
93         //    keySelector                 - function to obtain the key given an element
94         //    keyComparer                 - equality comparer for the keys
95         //
96
97         internal static PartitionedStream<Pair<TElement, THashKey>, int> HashRepartition<TElement, THashKey, TIgnoreKey>(
98             PartitionedStream<TElement, TIgnoreKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
99             IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
100         {
101             TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
102             return new UnorderedHashRepartitionStream<TElement, THashKey, TIgnoreKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
103         }
104
105         internal static PartitionedStream<Pair<TElement, THashKey>, TOrderKey> HashRepartitionOrdered<TElement, THashKey, TOrderKey>(
106             PartitionedStream<TElement, TOrderKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
107             IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
108         {
109             TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
110             return new OrderedHashRepartitionStream<TElement, THashKey, TOrderKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
111         }
112         
113         //---------------------------------------------------------------------------------------
114         // A helper method that given two OrdinalIndexState values return the "worse" one. For
115         // example, if state1 is valid and state2 is increasing, we will return
116         // OrdinalIndexState.Increasing.
117         //
118
119         internal static OrdinalIndexState Worse(this OrdinalIndexState state1, OrdinalIndexState state2)
120         {
121             return state1 > state2 ? state1 : state2;
122         }
123
124         internal static bool IsWorseThan(this OrdinalIndexState state1, OrdinalIndexState state2)
125         {
126             return state1 > state2;
127         }
128     }
129
130     /// <summary>
131     /// Used during hash partitioning, when the keys being memoized are not used for anything.
132     /// </summary>
133     internal struct NoKeyMemoizationRequired { }
134 }