3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // OrderedHashRepartitionEnumerator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
21 /// This enumerator handles the actual coordination among partitions required to
22 /// accomplish the repartitioning operation, as explained above. In addition to that,
23 /// it tracks order keys so that order preservation can flow through the enumerator.
25 /// <typeparam name="TInputOutput">The kind of elements.</typeparam>
26 /// <typeparam name="THashKey">The key used to distribute elements.</typeparam>
27 /// <typeparam name="TOrderKey">The kind of keys found in the source.</typeparam>
28 internal class OrderedHashRepartitionEnumerator<TInputOutput, THashKey, TOrderKey> : QueryOperatorEnumerator<Pair<TInputOutput, THashKey>, TOrderKey>
30 private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.
32 private readonly int m_partitionCount; // The number of partitions.
33 private readonly int m_partitionIndex; // Our unique partition index.
34 private readonly Func<TInputOutput, THashKey> m_keySelector; // A key-selector function.
35 private readonly HashRepartitionStream<TInputOutput, THashKey, TOrderKey> m_repartitionStream; // A repartitioning stream.
36 private readonly ListChunk<Pair<TInputOutput, THashKey>>[,] m_valueExchangeMatrix; // Matrix to do inter-task communication of values.
37 private readonly ListChunk<TOrderKey>[,] m_keyExchangeMatrix; // Matrix to do inter-task communication of order keys.
38 private readonly QueryOperatorEnumerator<TInputOutput, TOrderKey> m_source; // The immediate source of data.
39 private CountdownEvent m_barrier; // Used to signal and wait for repartitions to complete.
40 private readonly CancellationToken m_cancellationToken; // A token for canceling the process.
41 private Mutables m_mutables; // Mutable fields for this enumerator.
45 internal int m_currentBufferIndex; // Current buffer index.
46 internal ListChunk<Pair<TInputOutput, THashKey>> m_currentBuffer; // The buffer we're currently enumerating.
47 internal ListChunk<TOrderKey> m_currentKeyBuffer; // The buffer we're currently enumerating.
48 internal int m_currentIndex; // Current index into the buffer.
52 m_currentBufferIndex = ENUMERATION_NOT_STARTED;
56 //---------------------------------------------------------------------------------------
57 // Creates a new repartitioning enumerator.
60 // source - the data stream from which to pull elements
61 // useOrdinalOrderPreservation - whether order preservation is required
62 // partitionCount - total number of partitions
63 // partitionIndex - this operator's unique partition index
64 // repartitionStream - the stream object to use for partition selection
65 // barrier - a latch used to signal task completion
66 // buffers - a set of buffers for inter-task communication
69 internal OrderedHashRepartitionEnumerator(
70 QueryOperatorEnumerator<TInputOutput, TOrderKey> source, int partitionCount, int partitionIndex,
71 Func<TInputOutput, THashKey> keySelector, OrderedHashRepartitionStream<TInputOutput, THashKey, TOrderKey> repartitionStream, CountdownEvent barrier,
72 ListChunk<Pair<TInputOutput, THashKey>>[,] valueExchangeMatrix, ListChunk<TOrderKey>[,] keyExchangeMatrix, CancellationToken cancellationToken)
74 Contract.Assert(source != null);
75 Contract.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired));
76 Contract.Assert(repartitionStream != null);
77 Contract.Assert(barrier != null);
78 Contract.Assert(valueExchangeMatrix != null);
79 Contract.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)");
80 Contract.Assert(valueExchangeMatrix.GetLength(1) == partitionCount, "expected square matrix of buffers (NxN)");
81 Contract.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
84 m_partitionCount = partitionCount;
85 m_partitionIndex = partitionIndex;
86 m_keySelector = keySelector;
87 m_repartitionStream = repartitionStream;
89 m_valueExchangeMatrix = valueExchangeMatrix;
90 m_keyExchangeMatrix = keyExchangeMatrix;
91 m_cancellationToken = cancellationToken;
94 //---------------------------------------------------------------------------------------
95 // Retrieves the next element from this partition. All repartitioning operators across
96 // all partitions cooperate in a barrier-style algorithm. The first time an element is
97 // requested, the repartitioning operator will enter the 1st phase: during this phase, it
98 // scans its entire input and compute the destination partition for each element. During
99 // the 2nd phase, each partition scans the elements found by all other partitions for
100 // it, and yield this to callers. The only synchronization required is the barrier itself
101 // -- all other parts of this algorithm are synchronization-free.
103 // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a
104 // larger time-to-first-element latency, at least compared with our old implementation; this
105 // happens because all input elements must be fetched before we can produce a single output
106 // element. In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
107 // anyway, so having the repartitioning operator do so isn't complicating matters much at all.
110 internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement, ref TOrderKey currentKey)
112 if (m_partitionCount == 1)
114 TInputOutput current = default(TInputOutput);
116 // If there's only one partition, no need to do any sort of exchanges.
117 if (m_source.MoveNext(ref current, ref currentKey))
119 currentElement = new Pair<TInputOutput, THashKey>(
120 current, m_keySelector == null ? default(THashKey) : m_keySelector(current));
127 Mutables mutables = m_mutables;
128 if (mutables == null)
129 mutables = m_mutables = new Mutables();
131 // If we haven't enumerated the source yet, do that now. This is the first phase
132 // of a two-phase barrier style operation.
133 if (mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED)
135 EnumerateAndRedistributeElements();
136 Contract.Assert(mutables.m_currentBufferIndex != ENUMERATION_NOT_STARTED);
139 // Once we've enumerated our contents, we can then go back and walk the buffers that belong
140 // to the current partition. This is phase two. Note that we slyly move on to the first step
141 // of phase two before actually waiting for other partitions. That's because we can enumerate
142 // the buffer we wrote to above, as already noted.
143 while (mutables.m_currentBufferIndex < m_partitionCount)
145 // If the queue is non-null and still has elements, yield them.
146 if (mutables.m_currentBuffer != null)
148 Contract.Assert(mutables.m_currentKeyBuffer != null);
150 if (++mutables.m_currentIndex < mutables.m_currentBuffer.Count)
152 // Return the current element.
153 currentElement = mutables.m_currentBuffer.m_chunk[mutables.m_currentIndex];
154 Contract.Assert(mutables.m_currentKeyBuffer != null, "expected same # of buffers/key-buffers");
155 currentKey = mutables.m_currentKeyBuffer.m_chunk[mutables.m_currentIndex];
160 // If the chunk is empty, advance to the next one (if any).
161 mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
162 mutables.m_currentBuffer = mutables.m_currentBuffer.Next;
163 mutables.m_currentKeyBuffer = mutables.m_currentKeyBuffer.Next;
164 Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count > 0);
165 Contract.Assert((mutables.m_currentBuffer == null) == (mutables.m_currentKeyBuffer == null));
166 Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count == mutables.m_currentKeyBuffer.Count);
167 continue; // Go back around and invoke this same logic.
171 // We're done with the current partition. Slightly different logic depending on whether
172 // we're on our own buffer or one that somebody else found for us.
173 if (mutables.m_currentBufferIndex == m_partitionIndex)
175 // We now need to wait at the barrier, in case some other threads aren't done.
176 // Once we wake up, we reset our index and will increment it immediately after.
177 m_barrier.Wait(m_cancellationToken);
178 mutables.m_currentBufferIndex = ENUMERATION_NOT_STARTED;
181 // Advance to the next buffer.
182 mutables.m_currentBufferIndex++;
183 mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
185 if (mutables.m_currentBufferIndex == m_partitionIndex)
187 // Skip our current buffer (since we already enumerated it).
188 mutables.m_currentBufferIndex++;
191 // Assuming we're within bounds, retrieve the next buffer object.
192 if (mutables.m_currentBufferIndex < m_partitionCount)
194 mutables.m_currentBuffer = m_valueExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
195 mutables.m_currentKeyBuffer = m_keyExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
199 // We're done. No more buffers to enumerate.
203 //---------------------------------------------------------------------------------------
204 // Called when this enumerator is first enumerated; it must walk through the source
205 // and redistribute elements to their slot in the exchange matrix.
208 private void EnumerateAndRedistributeElements()
210 Mutables mutables = m_mutables;
211 Contract.Assert(mutables != null);
213 ListChunk<Pair<TInputOutput, THashKey>>[] privateBuffers = new ListChunk<Pair<TInputOutput, THashKey>>[m_partitionCount];
214 ListChunk<TOrderKey>[] privateKeyBuffers = new ListChunk<TOrderKey>[m_partitionCount];
216 TInputOutput element = default(TInputOutput);
217 TOrderKey key = default(TOrderKey);
219 while (m_source.MoveNext(ref element, ref key))
221 if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
222 CancellationState.ThrowIfCanceled(m_cancellationToken);
224 // Calculate the element's destination partition index, placing it into the
225 // appropriate buffer from which partitions will later enumerate.
226 int destinationIndex;
227 THashKey elementHashKey = default(THashKey);
228 if (m_keySelector != null)
230 elementHashKey = m_keySelector(element);
231 destinationIndex = m_repartitionStream.GetHashCode(elementHashKey) % m_partitionCount;
235 Contract.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
236 destinationIndex = m_repartitionStream.GetHashCode(element) % m_partitionCount;
239 Contract.Assert(0 <= destinationIndex && destinationIndex < m_partitionCount,
240 "destination partition outside of the legal range of partitions");
242 // Get the buffer for the destnation partition, lazily allocating if needed. We maintain
243 // this list in our own private cache so that we avoid accessing shared memory locations
244 // too much. In the original implementation, we'd access the buffer in the matrix ([N,M],
245 // where N is the current partition and M is the destination), but some rudimentary
246 // performance profiling indicates copying at the end performs better.
247 ListChunk<Pair<TInputOutput, THashKey>> buffer = privateBuffers[destinationIndex];
248 ListChunk<TOrderKey> keyBuffer = privateKeyBuffers[destinationIndex];
251 const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
252 Contract.Assert(keyBuffer == null);
253 privateBuffers[destinationIndex] = buffer = new ListChunk<Pair<TInputOutput, THashKey>>(INITIAL_PRIVATE_BUFFER_SIZE);
254 privateKeyBuffers[destinationIndex] = keyBuffer = new ListChunk<TOrderKey>(INITIAL_PRIVATE_BUFFER_SIZE);
257 buffer.Add(new Pair<TInputOutput, THashKey>(element, elementHashKey));
262 // Copy the local buffers to the shared space and then signal to other threads that
263 // we are done. We can then immediately move on to enumerating the elements we found
264 // for the current partition before waiting at the barrier. If we found a lot, we will
265 // hopefully never have to physically wait.
266 for (int i = 0; i < m_partitionCount; i++)
268 m_valueExchangeMatrix[m_partitionIndex, i] = privateBuffers[i];
269 m_keyExchangeMatrix[m_partitionIndex, i] = privateKeyBuffers[i];
274 // Begin at our own buffer.
275 mutables.m_currentBufferIndex = m_partitionIndex;
276 mutables.m_currentBuffer = privateBuffers[m_partitionIndex];
277 mutables.m_currentKeyBuffer = privateKeyBuffers[m_partitionIndex];
278 mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
281 protected override void Dispose(bool disposing)
283 if (m_barrier != null)
285 // Since this enumerator is being disposed, we will decrement the barrier,
286 // in case other enumerators will wait on the barrier.
287 if (m_mutables == null || (m_mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED))