3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // HashPartitionedStream.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// A repartitioning stream must take input data that has already been partitioned and
22 /// redistribute its contents based on a new partitioning algorithm. This is accomplished
23 /// by making each partition p responsible for redistributing its input data to the
24 /// correct destination partition. Some input elements may remain in p, but many will now
25 /// belong to a different partition and will need to move. This requires a great deal of
26 /// synchronization, but allows threads to repartition data incrementally and in parallel.
27 /// Each partition will "pull" data on-demand instead of partitions "pushing" data, which
28 /// allows us to reduce some amount of synchronization overhead.
30 /// We currently only offer one form of reparitioning via hashing. This used to be an
31 /// abstract base class, but we have eliminated that to get rid of some virtual calls on
32 /// hot code paths. Uses a key selection algorithm with mod'ding to determine destination.
35 /// <typeparam name="TInputOutput"></typeparam>
36 /// <typeparam name="THashKey"></typeparam>
37 /// <typeparam name="TOrderKey"></typeparam>
38 internal abstract class HashRepartitionStream<TInputOutput, THashKey, TOrderKey> : PartitionedStream<Pair<TInputOutput, THashKey>, TOrderKey>
40 private readonly IEqualityComparer<THashKey> m_keyComparer; // The optional key comparison routine.
41 private readonly IEqualityComparer<TInputOutput> m_elementComparer; // The optional element comparison routine.
42 private readonly int m_distributionMod; // The distribution value we'll use to scramble input.
44 //---------------------------------------------------------------------------------------
45 // Creates a new partition exchange operator.
48 internal HashRepartitionStream(
49 int partitionsCount, IComparer<TOrderKey> orderKeyComparer, IEqualityComparer<THashKey> hashKeyComparer,
50 IEqualityComparer<TInputOutput> elementComparer)
51 : base(partitionsCount, orderKeyComparer, OrdinalIndexState.Shuffled)
53 // elementComparer is used by operators that use elements themselves as the hash keys.
54 // When elements are used as keys, THashKey should be NoKeyMemoizationRequired.
55 m_keyComparer = hashKeyComparer;
56 m_elementComparer = elementComparer;
58 Contract.Assert(m_keyComparer == null || m_elementComparer == null);
59 Contract.Assert(m_elementComparer == null || typeof(THashKey) == typeof(NoKeyMemoizationRequired));
61 // We use the following constant when distributing hash-codes into partition streams.
62 // It's an (arbitrary) prime number to account for poor hashing functions, e.g. those
63 // that all the primitive types use (e.g. Int32 returns itself). The goal is to add some
64 // degree of randomization to avoid predictable distributions for certain data sequences,
65 // for the same reason prime numbers are frequently used in hashtable implementations.
66 // For instance, if all hash-codes end up as even, we would starve half of the partitions
67 // by just using the raw hash-code. This isn't perfect, of course, since a stream
68 // of integers with the same value end up in the same partition, but helps.
69 const int DEFAULT_HASH_MOD_DISTRIBUTION = 503;
71 // We need to guarantee our distribution mod is greater than the number of partitions.
72 m_distributionMod = DEFAULT_HASH_MOD_DISTRIBUTION;
73 while (m_distributionMod < partitionsCount)
75 // We use checked arithmetic here. We'll only overflow for huge numbers of partitions
76 // (quite unlikely), so the remote possibility of an exception is fine.
79 m_distributionMod *= 2;
84 //---------------------------------------------------------------------------------------
85 // Manufactures a hash code for a given value or key.
88 // The hash-code used for null elements.
89 private const int NULL_ELEMENT_HASH_CODE = 0;
91 internal int GetHashCode(TInputOutput element)
95 (m_elementComparer == null ?
96 (element == null ? NULL_ELEMENT_HASH_CODE : element.GetHashCode()) :
97 m_elementComparer.GetHashCode(element)))
101 internal int GetHashCode(THashKey key)
105 (m_keyComparer == null ?
106 (key == null ? NULL_ELEMENT_HASH_CODE : key.GetHashCode()) :
107 m_keyComparer.GetHashCode(key))) % m_distributionMod;