35865feb93635dff2ec3f7f8690040e3132c4aeb
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Partitioning / OrderedHashRepartitionEnumerator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // OrderedHashRepartitionEnumerator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// This enumerator handles the actual coordination among partitions required to
22     /// accomplish the repartitioning operation, as explained above.  In addition to that,
23     /// it tracks order keys so that order preservation can flow through the enumerator.
24     /// </summary>
25     /// <typeparam name="TInputOutput">The kind of elements.</typeparam>
26     /// <typeparam name="THashKey">The key used to distribute elements.</typeparam>
27     /// <typeparam name="TOrderKey">The kind of keys found in the source.</typeparam>
28     internal class OrderedHashRepartitionEnumerator<TInputOutput, THashKey, TOrderKey> : QueryOperatorEnumerator<Pair<TInputOutput, THashKey>, TOrderKey>
29     {
30         private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.
31
32         private readonly int m_partitionCount; // The number of partitions.
33         private readonly int m_partitionIndex; // Our unique partition index.
34         private readonly Func<TInputOutput, THashKey> m_keySelector; // A key-selector function.
35         private readonly HashRepartitionStream<TInputOutput, THashKey, TOrderKey> m_repartitionStream; // A repartitioning stream.
36         private readonly ListChunk<Pair<TInputOutput, THashKey>>[,] m_valueExchangeMatrix; // Matrix to do inter-task communication of values.
37         private readonly ListChunk<TOrderKey>[,] m_keyExchangeMatrix; // Matrix to do inter-task communication of order keys.
38         private readonly QueryOperatorEnumerator<TInputOutput, TOrderKey> m_source; // The immediate source of data.
39         private CountdownEvent m_barrier; // Used to signal and wait for repartitions to complete.
40         private readonly CancellationToken m_cancellationToken; // A token for canceling the process.
41         private Mutables m_mutables; // Mutable fields for this enumerator.
42
43         class Mutables
44         {
45             internal int m_currentBufferIndex; // Current buffer index.
46             internal ListChunk<Pair<TInputOutput, THashKey>> m_currentBuffer; // The buffer we're currently enumerating.
47             internal ListChunk<TOrderKey> m_currentKeyBuffer; // The buffer we're currently enumerating.
48             internal int m_currentIndex; // Current index into the buffer.
49
50             internal Mutables()
51             {
52                 m_currentBufferIndex = ENUMERATION_NOT_STARTED;
53             }
54         }
55
56         //---------------------------------------------------------------------------------------
57         // Creates a new repartitioning enumerator.
58         //
59         // Arguments:
60         //     source            - the data stream from which to pull elements
61         //     useOrdinalOrderPreservation - whether order preservation is required
62         //     partitionCount    - total number of partitions
63         //     partitionIndex    - this operator's unique partition index
64         //     repartitionStream - the stream object to use for partition selection
65         //     barrier           - a latch used to signal task completion
66         //     buffers           - a set of buffers for inter-task communication
67         //
68
69         internal OrderedHashRepartitionEnumerator(
70             QueryOperatorEnumerator<TInputOutput, TOrderKey> source, int partitionCount, int partitionIndex,
71             Func<TInputOutput, THashKey> keySelector, OrderedHashRepartitionStream<TInputOutput, THashKey, TOrderKey> repartitionStream, CountdownEvent barrier,
72             ListChunk<Pair<TInputOutput, THashKey>>[,] valueExchangeMatrix, ListChunk<TOrderKey>[,] keyExchangeMatrix, CancellationToken cancellationToken)
73         {
74             Contract.Assert(source != null);
75             Contract.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired));
76             Contract.Assert(repartitionStream != null);
77             Contract.Assert(barrier != null);
78             Contract.Assert(valueExchangeMatrix != null);
79             Contract.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)");
80             Contract.Assert(valueExchangeMatrix.GetLength(1) == partitionCount, "expected square matrix of buffers (NxN)");
81             Contract.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
82
83             m_source = source;
84             m_partitionCount = partitionCount;
85             m_partitionIndex = partitionIndex;
86             m_keySelector = keySelector;
87             m_repartitionStream = repartitionStream;
88             m_barrier = barrier;
89             m_valueExchangeMatrix = valueExchangeMatrix;
90             m_keyExchangeMatrix = keyExchangeMatrix;
91             m_cancellationToken = cancellationToken;
92         }
93
94         //---------------------------------------------------------------------------------------
95         // Retrieves the next element from this partition.  All repartitioning operators across
96         // all partitions cooperate in a barrier-style algorithm.  The first time an element is
97         // requested, the repartitioning operator will enter the 1st phase: during this phase, it
98         // scans its entire input and compute the destination partition for each element.  During
99         // the 2nd phase, each partition scans the elements found by all other partitions for
100         // it, and yield this to callers.  The only synchronization required is the barrier itself
101         // -- all other parts of this algorithm are synchronization-free.
102         //
103         // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a
104         // larger time-to-first-element latency, at least compared with our old implementation; this
105         // happens because all input elements must be fetched before we can produce a single output
106         // element.  In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
107         // anyway, so having the repartitioning operator do so isn't complicating matters much at all.
108         //
109
110         internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement, ref TOrderKey currentKey)
111         {
112             if (m_partitionCount == 1)
113             {
114                 TInputOutput current = default(TInputOutput);
115
116                 // If there's only one partition, no need to do any sort of exchanges.
117                 if (m_source.MoveNext(ref current, ref currentKey))
118                 {
119                     currentElement = new Pair<TInputOutput, THashKey>(
120                         current, m_keySelector == null ? default(THashKey) : m_keySelector(current));
121                     return true;
122                 }
123
124                 return false;
125             }
126
127             Mutables mutables = m_mutables;
128             if (mutables == null)
129                 mutables = m_mutables = new Mutables();
130
131             // If we haven't enumerated the source yet, do that now.  This is the first phase
132             // of a two-phase barrier style operation.
133             if (mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED)
134             {
135                 EnumerateAndRedistributeElements();
136                 Contract.Assert(mutables.m_currentBufferIndex != ENUMERATION_NOT_STARTED);
137             }
138
139             // Once we've enumerated our contents, we can then go back and walk the buffers that belong
140             // to the current partition.  This is phase two.  Note that we slyly move on to the first step
141             // of phase two before actually waiting for other partitions.  That's because we can enumerate
142             // the buffer we wrote to above, as already noted.
143             while (mutables.m_currentBufferIndex < m_partitionCount)
144             {
145                 // If the queue is non-null and still has elements, yield them.
146                 if (mutables.m_currentBuffer != null)
147                 {
148                     Contract.Assert(mutables.m_currentKeyBuffer != null);
149
150                     if (++mutables.m_currentIndex < mutables.m_currentBuffer.Count)
151                     {
152                         // Return the current element.
153                         currentElement = mutables.m_currentBuffer.m_chunk[mutables.m_currentIndex];
154                         Contract.Assert(mutables.m_currentKeyBuffer != null, "expected same # of buffers/key-buffers");
155                         currentKey = mutables.m_currentKeyBuffer.m_chunk[mutables.m_currentIndex];
156                         return true;
157                     }
158                     else
159                     {
160                         // If the chunk is empty, advance to the next one (if any).
161                         mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
162                         mutables.m_currentBuffer = mutables.m_currentBuffer.Next;
163                         mutables.m_currentKeyBuffer = mutables.m_currentKeyBuffer.Next;
164                         Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count > 0);
165                         Contract.Assert((mutables.m_currentBuffer == null) == (mutables.m_currentKeyBuffer == null));
166                         Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count == mutables.m_currentKeyBuffer.Count);
167                         continue; // Go back around and invoke this same logic.
168                     }
169                 }
170
171                 // We're done with the current partition.  Slightly different logic depending on whether
172                 // we're on our own buffer or one that somebody else found for us.
173                 if (mutables.m_currentBufferIndex == m_partitionIndex)
174                 {
175                     // We now need to wait at the barrier, in case some other threads aren't done.
176                     // Once we wake up, we reset our index and will increment it immediately after.
177                     m_barrier.Wait(m_cancellationToken);
178                     mutables.m_currentBufferIndex = ENUMERATION_NOT_STARTED;
179                 }
180
181                 // Advance to the next buffer.
182                 mutables.m_currentBufferIndex++;
183                 mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
184
185                 if (mutables.m_currentBufferIndex == m_partitionIndex)
186                 {
187                     // Skip our current buffer (since we already enumerated it).
188                     mutables.m_currentBufferIndex++;
189                 }
190
191                 // Assuming we're within bounds, retrieve the next buffer object.
192                 if (mutables.m_currentBufferIndex < m_partitionCount)
193                 {
194                     mutables.m_currentBuffer = m_valueExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
195                     mutables.m_currentKeyBuffer = m_keyExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
196                 }
197             }
198
199             // We're done. No more buffers to enumerate.
200             return false;
201         }
202
203         //---------------------------------------------------------------------------------------
204         // Called when this enumerator is first enumerated; it must walk through the source
205         // and redistribute elements to their slot in the exchange matrix.
206         //
207
208         private void EnumerateAndRedistributeElements()
209         {
210             Mutables mutables = m_mutables;
211             Contract.Assert(mutables != null);
212
213             ListChunk<Pair<TInputOutput, THashKey>>[] privateBuffers = new ListChunk<Pair<TInputOutput, THashKey>>[m_partitionCount];
214             ListChunk<TOrderKey>[] privateKeyBuffers = new ListChunk<TOrderKey>[m_partitionCount];
215
216             TInputOutput element = default(TInputOutput);
217             TOrderKey key = default(TOrderKey);
218             int loopCount = 0;
219             while (m_source.MoveNext(ref element, ref key))
220             {
221                 if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
222                     CancellationState.ThrowIfCanceled(m_cancellationToken);
223
224                 // Calculate the element's destination partition index, placing it into the
225                 // appropriate buffer from which partitions will later enumerate.
226                 int destinationIndex;
227                 THashKey elementHashKey = default(THashKey);
228                 if (m_keySelector != null)
229                 {
230                     elementHashKey = m_keySelector(element);
231                     destinationIndex = m_repartitionStream.GetHashCode(elementHashKey) % m_partitionCount;
232                 }
233                 else
234                 {
235                     Contract.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
236                     destinationIndex = m_repartitionStream.GetHashCode(element) % m_partitionCount;
237                 }
238
239                 Contract.Assert(0 <= destinationIndex && destinationIndex < m_partitionCount,
240                                 "destination partition outside of the legal range of partitions");
241
242                 // Get the buffer for the destnation partition, lazily allocating if needed.  We maintain
243                 // this list in our own private cache so that we avoid accessing shared memory locations
244                 // too much.  In the original implementation, we'd access the buffer in the matrix ([N,M],
245                 // where N is the current partition and M is the destination), but some rudimentary
246                 // performance profiling indicates copying at the end performs better.
247                 ListChunk<Pair<TInputOutput, THashKey>> buffer = privateBuffers[destinationIndex];
248                 ListChunk<TOrderKey> keyBuffer = privateKeyBuffers[destinationIndex];
249                 if (buffer == null)
250                 {
251                     const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
252                     Contract.Assert(keyBuffer == null);
253                     privateBuffers[destinationIndex] = buffer = new ListChunk<Pair<TInputOutput, THashKey>>(INITIAL_PRIVATE_BUFFER_SIZE);
254                     privateKeyBuffers[destinationIndex] = keyBuffer = new ListChunk<TOrderKey>(INITIAL_PRIVATE_BUFFER_SIZE);
255                 }
256
257                 buffer.Add(new Pair<TInputOutput, THashKey>(element, elementHashKey));
258                 keyBuffer.Add(key);
259
260             }
261
262             // Copy the local buffers to the shared space and then signal to other threads that
263             // we are done.  We can then immediately move on to enumerating the elements we found
264             // for the current partition before waiting at the barrier.  If we found a lot, we will
265             // hopefully never have to physically wait.
266             for (int i = 0; i < m_partitionCount; i++)
267             {
268                 m_valueExchangeMatrix[m_partitionIndex, i] = privateBuffers[i];
269                 m_keyExchangeMatrix[m_partitionIndex, i] = privateKeyBuffers[i];
270             }
271
272             m_barrier.Signal();
273
274             // Begin at our own buffer.
275             mutables.m_currentBufferIndex = m_partitionIndex;
276             mutables.m_currentBuffer = privateBuffers[m_partitionIndex];
277             mutables.m_currentKeyBuffer = privateKeyBuffers[m_partitionIndex];
278             mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
279         }
280
281         protected override void Dispose(bool disposing)
282         {
283             if (m_barrier != null)
284             {
285                 // Since this enumerator is being disposed, we will decrement the barrier,
286                 // in case other enumerators will wait on the barrier.
287                 if (m_mutables == null || (m_mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED))
288                 {
289                     m_barrier.Signal();
290                     m_barrier = null;
291                 }
292
293                 m_source.Dispose();
294             }
295         }
296     }
297 }