3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // PartitionedDataSource.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
21 /// Contiguous range chunk partitioning attempts to improve data locality by keeping
22 /// data close together in the incoming data stream together in the outgoing partitions.
23 /// There are really three types of partitions that are used internally:
25 /// 1. If the data source is indexable--like an array or List_T--we can actually
26 /// just compute the range indexes and avoid doing any copying whatsoever. Each
27 /// "partition" is just an enumerator that will walk some subset of the data.
28 /// 2. If the data source has an index (different than being indexable!), we can
29 /// turn this into a range scan of the index. We can roughly estimate distribution
30 /// and ensure an evenly balanced set of partitions.
31 /// 3. If we can't use 1 or 2, we instead partition "on demand" by chunking the contents
32 /// of the source enumerator as they are requested. The unfortunate thing is that
33 /// this requires synchronization, since consumers may be running in parallel. We
34 /// amortize the cost of this by giving chunks of items when requested instead of
35 /// one element at a time. Note that this approach also works for infinite streams.
37 /// In all cases, the caller can request that enumerators walk elements in striped
38 /// contiguous chunks. If striping is requested, then each partition j will yield elements
39 /// in the data source for which ((i / s)%p) == j, where i is the element's index, s is
40 /// a chunk size calculated by the system with the intent of aligning on cache lines, and
41 /// p is the number of partitions. If striping is not requested, we use the same algorith,
42 /// only, instead of aligning on cache lines, we use a chunk size of l / p, where l
43 /// is the length of the input and p is the number of partitions.
46 /// This is used as the default partitioning strategy by much of the PLINQ infrastructure.
48 /// <typeparam name="T"></typeparam>
49 internal class PartitionedDataSource<T> : PartitionedStream<T, int>
52 //---------------------------------------------------------------------------------------
53 // Just constructs a new partition stream.
56 internal PartitionedDataSource(IEnumerable<T> source, int partitionCount, bool useStriping)
59 Util.GetDefaultComparer<int>(),
60 source is IList<T> ? OrdinalIndexState.Indexible : OrdinalIndexState.Correct)
62 InitializePartitions(source, partitionCount, useStriping);
65 //---------------------------------------------------------------------------------------
66 // This method just creates the individual partitions given a data source.
69 // We check whether the data source is an IList<T> and, if so, we can partition
70 // "in place" by calculating a set of indexes. Otherwise, we return an enumerator that
71 // performs partitioning lazily. Depending on which case it is, the enumerator may
72 // contain synchronization (i.e. the latter case), meaning callers may occ----ionally
73 // block when enumerating it.
76 private void InitializePartitions(IEnumerable<T> source, int partitionCount, bool useStriping)
78 Contract.Assert(source != null);
79 Contract.Assert(partitionCount > 0);
81 // If this is a wrapper, grab the internal wrapped data source so we can uncover its real type.
82 ParallelEnumerableWrapper<T> wrapper = source as ParallelEnumerableWrapper<T>;
85 source = wrapper.WrappedEnumerable;
86 Contract.Assert(source != null);
89 // Check whether we have an indexable data source.
90 IList<T> sourceAsList = source as IList<T>;
91 if (sourceAsList != null)
93 QueryOperatorEnumerator<T, int>[] partitions = new QueryOperatorEnumerator<T, int>[partitionCount];
94 int listCount = sourceAsList.Count;
96 // We use this below to specialize enumerators when possible.
97 T[] sourceAsArray = source as T[];
99 // If range partitioning is used, chunk size will be unlimited, i.e. -1.
100 int maxChunkSize = -1;
104 maxChunkSize = Scheduling.GetDefaultChunkSize<T>();
106 // The minimum chunk size is 1.
107 if (maxChunkSize < 1)
113 // Calculate indexes and construct enumerators that walk a subset of the input.
114 for (int i = 0; i < partitionCount; i++)
116 if (sourceAsArray != null)
118 // If the source is an array, we can use a fast path below to index using
119 // 'ldelem' instructions rather than making interface method calls.
122 partitions[i] = new ArrayIndexRangeEnumerator(sourceAsArray, partitionCount, i, maxChunkSize);
126 partitions[i] = new ArrayContiguousIndexRangeEnumerator(sourceAsArray, partitionCount, i);
128 TraceHelpers.TraceInfo("ContigousRangePartitionExchangeStream::MakePartitions - (array) #{0} {1}", i, maxChunkSize);
132 // Create a general purpose list enumerator object.
135 partitions[i] = new ListIndexRangeEnumerator(sourceAsList, partitionCount, i, maxChunkSize);
139 partitions[i] = new ListContiguousIndexRangeEnumerator(sourceAsList, partitionCount, i);
141 TraceHelpers.TraceInfo("ContigousRangePartitionExchangeStream::MakePartitions - (list) #{0} {1})", i, maxChunkSize);
145 Contract.Assert(partitions.Length == partitionCount);
146 m_partitions = partitions;
150 // We couldn't use an in-place partition. Shucks. Defer to the other overload which
151 // accepts an enumerator as input instead.
152 m_partitions = MakePartitions(source.GetEnumerator(), partitionCount);
156 //---------------------------------------------------------------------------------------
157 // This method just creates the individual partitions given a data source. See the base
158 // class for more details on this method's contracts. This version takes an enumerator,
159 // and so it can't actually do an in-place partition. We'll instead create enumerators
160 // that coordinate with one another to lazily (on demand) grab chunks from the enumerator.
161 // This clearly is much less efficient than the fast path above since it requires
162 // synchronization. We try to amortize that cost by retrieving many elements at once
163 // instead of just one-at-a-time.
166 private static QueryOperatorEnumerator<T, int>[] MakePartitions(IEnumerator<T> source, int partitionCount)
168 Contract.Assert(source != null);
169 Contract.Assert(partitionCount > 0);
171 // At this point we were unable to efficiently partition the data source. Instead, we
172 // will return enumerators that lazily partition the data source.
173 QueryOperatorEnumerator<T, int>[] partitions = new QueryOperatorEnumerator<T, int>[partitionCount];
175 // The following is used for synchronization between threads.
176 object sharedSyncLock = new object();
177 Shared<int> sharedCurrentIndex = new Shared<int>(0);
178 Shared<int> sharedPartitionCount = new Shared<int>(partitionCount);
179 Shared<bool> sharedExeceptionTracker = new Shared<bool>(false);
181 // Create a new lazy chunking enumerator per partition, sharing the same lock.
182 for (int i = 0; i < partitionCount; i++)
184 partitions[i] = new ContiguousChunkLazyEnumerator(
185 source, sharedExeceptionTracker, sharedSyncLock, sharedCurrentIndex, sharedPartitionCount);
191 //---------------------------------------------------------------------------------------
192 // This enumerator walks a range within an indexable data type. It's abstract. We assume
193 // callers have validated that the ranges are legal given the data. IndexRangeEnumerator
194 // handles both striped and range partitioning.
196 // PLINQ creates one IndexRangeEnumerator per partition. Together, the enumerators will
197 // cover the entire list or array.
199 // In this context, the term "range" represents the entire array or list. The range is
200 // split up into one or more "sections". Each section is split up into as many "chunks" as
201 // we have partitions. i-th chunk in each section is assigned to partition i.
203 // All sections but the last one contain partitionCount * maxChunkSize elements, except
204 // for the last section which may contain fewer.
206 // For example, if the input is an array with 2,101 elements, maxChunkSize is 128
207 // and partitionCount is 4, all sections except the last one will contain 128*4 = 512
208 // elements. The last section will contain 2,101 - 4*512 = 53 elements.
210 // All sections but the last one will be evenly divided among partitions: the first 128
211 // elements will go into partition 0, the next 128 elements into partition 1, etc.
213 // The last section is divided as evenly as possible. In the above example, the split would
217 // A copy of the index enumerator specialized for array indexing. It uses 'ldelem'
218 // instructions for element retrieval, rather than using a method call.
219 internal sealed class ArrayIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
221 private readonly T[] m_data; // The elements to iterate over.
222 private readonly int m_elementCount; // The number of elements to iterate over.
223 private readonly int m_partitionCount; // The number of partitions.
224 private readonly int m_partitionIndex; // The index of the current partition.
225 private readonly int m_maxChunkSize; // The maximum size of a chunk. -1 if unlimited.
226 private readonly int m_sectionCount; // Precomputed in ctor: the number of sections the range is split into.
227 private Mutables m_mutables; // Lazily allocated mutable variables.
233 // Place the enumerator just before the first element
234 m_currentSection = -1;
237 internal int m_currentSection; // 0-based index of the current section.
238 internal int m_currentChunkSize; // The number of elements in the current chunk.
239 internal int m_currentPositionInChunk; // 0-based position within the current chunk.
240 internal int m_currentChunkOffset; // The offset of the current chunk from the beginning of the range.
243 internal ArrayIndexRangeEnumerator(T[] data, int partitionCount, int partitionIndex, int maxChunkSize)
245 Contract.Assert(data != null, "data musn't be null");
246 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
247 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
248 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
249 Contract.Assert(maxChunkSize > 0, "maxChunkSize must be positive or -1");
252 m_elementCount = data.Length;
253 m_partitionCount = partitionCount;
254 m_partitionIndex = partitionIndex;
255 m_maxChunkSize = maxChunkSize;
257 int sectionSize = maxChunkSize * partitionCount;
258 Contract.Assert(sectionSize > 0);
260 // Section count is ceiling(elementCount / sectionSize)
261 m_sectionCount = m_elementCount / sectionSize +
262 ((m_elementCount % sectionSize) == 0 ? 0 : 1);
265 internal override bool MoveNext(ref T currentElement, ref int currentKey)
267 // Lazily allocate the mutable holder.
268 Mutables mutables = m_mutables;
269 if (mutables == null)
271 mutables = m_mutables = new Mutables();
274 // If we are aren't within the chunk, we need to find another.
275 if (++mutables.m_currentPositionInChunk < mutables.m_currentChunkSize || MoveNextSlowPath())
277 currentKey = mutables.m_currentChunkOffset + mutables.m_currentPositionInChunk;
278 currentElement = m_data[currentKey];
285 private bool MoveNextSlowPath()
287 Mutables mutables = m_mutables;
288 Contract.Assert(mutables != null);
289 Contract.Assert(mutables.m_currentPositionInChunk >= mutables.m_currentChunkSize);
291 // Move on to the next section.
292 int currentSection = ++mutables.m_currentSection;
293 int sectionsRemaining = m_sectionCount - currentSection;
295 // If empty, return right away.
296 if (sectionsRemaining <= 0)
301 // Compute the offset of the current section from the beginning of the range
302 int currentSectionOffset = currentSection * m_partitionCount * m_maxChunkSize;
303 mutables.m_currentPositionInChunk = 0;
305 // Now do something different based on how many chunks remain.
306 if (sectionsRemaining > 1)
308 // We are not on the last section. The size of this chunk is simply m_maxChunkSize.
309 mutables.m_currentChunkSize = m_maxChunkSize;
310 mutables.m_currentChunkOffset = currentSectionOffset + m_partitionIndex * m_maxChunkSize;
314 // We are on the last section. Compute the size of the chunk to ensure even distribution
316 int lastSectionElementCount = m_elementCount - currentSectionOffset;
317 int smallerChunkSize = lastSectionElementCount / m_partitionCount;
318 int biggerChunkCount = lastSectionElementCount % m_partitionCount;
320 mutables.m_currentChunkSize = smallerChunkSize;
321 if (m_partitionIndex < biggerChunkCount)
323 mutables.m_currentChunkSize++;
325 if (mutables.m_currentChunkSize == 0)
330 mutables.m_currentChunkOffset =
331 currentSectionOffset // The beginning of this section
332 + m_partitionIndex * smallerChunkSize // + the start of this chunk if all chunks were "smaller"
333 + (m_partitionIndex < biggerChunkCount ? m_partitionIndex : biggerChunkCount); // + the number of "bigger" chunks before this chunk
340 // A contiguous index enumerator specialized for array indexing. It uses 'ldelem'
341 // instructions for element retrieval, rather than using a method call.
342 internal sealed class ArrayContiguousIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
344 private readonly T[] m_data; // The elements to iterate over.
345 private readonly int m_startIndex; // Where to begin iterating.
346 private readonly int m_maximumIndex; // The maximum index to iterate over.
347 private Shared<int> m_currentIndex; // The current index (lazily allocated).
349 internal ArrayContiguousIndexRangeEnumerator(T[] data, int partitionCount, int partitionIndex)
351 Contract.Assert(data != null, "data musn't be null");
352 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
353 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
354 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
358 // Compute the size of the chunk to ensure even distribution of elements.
359 int smallerChunkSize = data.Length / partitionCount;
360 int biggerChunkCount = data.Length % partitionCount;
362 // Our start index is our index times the small chunk size, plus the number
363 // of "bigger" chunks before this one.
364 int startIndex = partitionIndex * smallerChunkSize +
365 (partitionIndex < biggerChunkCount ? partitionIndex : biggerChunkCount);
367 m_startIndex = startIndex - 1; // Subtract one for the first call.
368 m_maximumIndex = startIndex + smallerChunkSize + // And add one if we're responsible for part of the
369 (partitionIndex < biggerChunkCount ? 1 : 0); // leftover chunks.
371 Contract.Assert(m_currentIndex == null, "Expected deferred allocation to ensure it happens on correct thread");
374 internal override bool MoveNext(ref T currentElement, ref int currentKey)
376 // Lazily allocate the current index if needed.
377 if (m_currentIndex == null)
379 m_currentIndex = new Shared<int>(m_startIndex);
382 // Now increment the current index, check bounds, and so on.
383 int current = ++m_currentIndex.Value;
384 if (current < m_maximumIndex)
386 currentKey = current;
387 currentElement = m_data[current];
395 // A copy of the index enumerator specialized for IList<T> indexing. It calls through
396 // the IList<T> interface for element retrieval.
397 internal sealed class ListIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
399 private readonly IList<T> m_data; // The elements to iterate over.
400 private readonly int m_elementCount; // The number of elements to iterate over.
401 private readonly int m_partitionCount; // The number of partitions.
402 private readonly int m_partitionIndex; // The index of the current partition.
403 private readonly int m_maxChunkSize; // The maximum size of a chunk. -1 if unlimited.
404 private readonly int m_sectionCount; // Precomputed in ctor: the number of sections the range is split into.
405 private Mutables m_mutables; // Lazily allocated mutable variables.
411 // Place the enumerator just before the first element
412 m_currentSection = -1;
415 internal int m_currentSection; // 0-based index of the current section.
416 internal int m_currentChunkSize; // The number of elements in the current chunk.
417 internal int m_currentPositionInChunk; // 0-based position within the current chunk.
418 internal int m_currentChunkOffset; // The offset of the current chunk from the beginning of the range.
421 internal ListIndexRangeEnumerator(IList<T> data, int partitionCount, int partitionIndex, int maxChunkSize)
423 Contract.Assert(data != null, "data musn't be null");
424 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
425 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
426 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
427 Contract.Assert(maxChunkSize > 0, "maxChunkSize must be positive or -1");
430 m_elementCount = data.Count;
431 m_partitionCount = partitionCount;
432 m_partitionIndex = partitionIndex;
433 m_maxChunkSize = maxChunkSize;
435 int sectionSize = maxChunkSize * partitionCount;
436 Contract.Assert(sectionSize > 0);
438 // Section count is ceiling(elementCount / sectionSize)
439 m_sectionCount = m_elementCount / sectionSize +
440 ((m_elementCount % sectionSize) == 0 ? 0 : 1);
443 internal override bool MoveNext(ref T currentElement, ref int currentKey)
445 // Lazily allocate the mutable holder.
446 Mutables mutables = m_mutables;
447 if (mutables == null)
449 mutables = m_mutables = new Mutables();
452 // If we are aren't within the chunk, we need to find another.
453 if (++mutables.m_currentPositionInChunk < mutables.m_currentChunkSize || MoveNextSlowPath())
455 currentKey = mutables.m_currentChunkOffset + mutables.m_currentPositionInChunk;
456 currentElement = m_data[currentKey];
463 private bool MoveNextSlowPath()
465 Mutables mutables = m_mutables;
466 Contract.Assert(mutables != null);
467 Contract.Assert(mutables.m_currentPositionInChunk >= mutables.m_currentChunkSize);
469 // Move on to the next section.
470 int currentSection = ++mutables.m_currentSection;
471 int sectionsRemaining = m_sectionCount - currentSection;
473 // If empty, return right away.
474 if (sectionsRemaining <= 0)
479 // Compute the offset of the current section from the beginning of the range
480 int currentSectionOffset = currentSection * m_partitionCount * m_maxChunkSize;
481 mutables.m_currentPositionInChunk = 0;
483 // Now do something different based on how many chunks remain.
484 if (sectionsRemaining > 1)
486 // We are not on the last section. The size of this chunk is simply m_maxChunkSize.
487 mutables.m_currentChunkSize = m_maxChunkSize;
488 mutables.m_currentChunkOffset = currentSectionOffset + m_partitionIndex * m_maxChunkSize;
492 // We are on the last section. Compute the size of the chunk to ensure even distribution
494 int lastSectionElementCount = m_elementCount - currentSectionOffset;
495 int smallerChunkSize = lastSectionElementCount / m_partitionCount;
496 int biggerChunkCount = lastSectionElementCount % m_partitionCount;
498 mutables.m_currentChunkSize = smallerChunkSize;
499 if (m_partitionIndex < biggerChunkCount)
501 mutables.m_currentChunkSize++;
503 if (mutables.m_currentChunkSize == 0)
508 mutables.m_currentChunkOffset =
509 currentSectionOffset // The beginning of this section
510 + m_partitionIndex * smallerChunkSize // + the start of this chunk if all chunks were "smaller"
511 + (m_partitionIndex < biggerChunkCount ? m_partitionIndex : biggerChunkCount); // + the number of "bigger" chunks before this chunk
518 // A contiguous index enumerator specialized for IList<T> indexing. It calls through
519 // the IList<T> interface for element retrieval.
520 internal sealed class ListContiguousIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
522 private readonly IList<T> m_data; // The elements to iterate over.
523 private readonly int m_startIndex; // Where to begin iterating.
524 private readonly int m_maximumIndex; // The maximum index to iterate over.
525 private Shared<int> m_currentIndex; // The current index (lazily allocated).
527 internal ListContiguousIndexRangeEnumerator(IList<T> data, int partitionCount, int partitionIndex)
529 Contract.Assert(data != null, "data musn't be null");
530 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
531 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
532 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
536 // Compute the size of the chunk to ensure even distribution of elements.
537 int smallerChunkSize = data.Count / partitionCount;
538 int biggerChunkCount = data.Count % partitionCount;
540 // Our start index is our index times the small chunk size, plus the number
541 // of "bigger" chunks before this one.
542 int startIndex = partitionIndex * smallerChunkSize +
543 (partitionIndex < biggerChunkCount ? partitionIndex : biggerChunkCount);
545 m_startIndex = startIndex - 1; // Subtract one for the first call.
546 m_maximumIndex = startIndex + smallerChunkSize + // And add one if we're responsible for part of the
547 (partitionIndex < biggerChunkCount ? 1 : 0); // leftover chunks.
549 Contract.Assert(m_currentIndex == null, "Expected deferred allocation to ensure it happens on correct thread");
552 internal override bool MoveNext(ref T currentElement, ref int currentKey)
554 // Lazily allocate the current index if needed.
555 if (m_currentIndex == null)
557 m_currentIndex = new Shared<int>(m_startIndex);
560 // Now increment the current index, check bounds, and so on.
561 int current = ++m_currentIndex.Value;
562 if (current < m_maximumIndex)
564 currentKey = current;
565 currentElement = m_data[current];
573 //---------------------------------------------------------------------------------------
574 // This enumerator lazily grabs chunks of data from the underlying data source. It is
575 // safe for this data source to be enumerated by multiple such enumerators, since it has
576 // been written to perform proper synchronization.
579 private class ContiguousChunkLazyEnumerator : QueryOperatorEnumerator<T, int>
581 private const int chunksPerChunkSize = 7; // the rate at which to double the chunksize (double chunksize every 'r' chunks). MUST BE == (2^n)-1 for some n.
582 private readonly IEnumerator<T> m_source; // Data source to enumerate.
583 private readonly object m_sourceSyncLock; // Lock to use for all synchronization.
584 private readonly Shared<int> m_currentIndex; // The index shared by all.
585 private readonly Shared<int> m_activeEnumeratorsCount; // How many enumerators over the same source have not been disposed yet?
586 private readonly Shared<bool> m_exceptionTracker;
587 private Mutables m_mutables; // Any mutable fields on this enumerator. These mutables are local and persistent
593 m_nextChunkMaxSize = 1; // We start the chunk size at 1 and grow it later.
594 m_chunkBuffer = new T[Scheduling.GetDefaultChunkSize<T>()]; // Pre-allocate the array at the maximum size.
595 m_currentChunkSize = 0; // The chunk begins life begins empty.
596 m_currentChunkIndex = -1;
597 m_chunkBaseIndex = 0;
601 internal readonly T[] m_chunkBuffer; // Buffer array for the current chunk being enumerated.
602 internal int m_nextChunkMaxSize; // The max. chunk size to use for the next chunk.
603 internal int m_currentChunkSize; // The element count for our current chunk.
604 internal int m_currentChunkIndex; // Our current index within the temporary chunk.
605 internal int m_chunkBaseIndex; // The start index from which the current chunk was taken.
606 internal int m_chunkCounter;
609 //---------------------------------------------------------------------------------------
610 // Constructs a new enumerator that lazily retrieves chunks from the provided source.
613 internal ContiguousChunkLazyEnumerator(
614 IEnumerator<T> source, Shared<bool> exceptionTracker, object sourceSyncLock, Shared<int> currentIndex, Shared<int> degreeOfParallelism)
616 Contract.Assert(source != null);
617 Contract.Assert(sourceSyncLock != null);
618 Contract.Assert(currentIndex != null);
621 m_sourceSyncLock = sourceSyncLock;
622 m_currentIndex = currentIndex;
623 m_activeEnumeratorsCount = degreeOfParallelism;
624 m_exceptionTracker = exceptionTracker;
627 //---------------------------------------------------------------------------------------
628 // Just retrieves the current element from our current chunk.
631 internal override bool MoveNext(ref T currentElement, ref int currentKey)
633 Mutables mutables = m_mutables;
634 if (mutables == null)
636 mutables = m_mutables = new Mutables();
639 Contract.Assert(mutables.m_chunkBuffer != null);
641 // Loop until we've exhausted our data source.
644 // If we have elements remaining in the current chunk, return right away.
645 T[] chunkBuffer = mutables.m_chunkBuffer;
646 int currentChunkIndex = ++mutables.m_currentChunkIndex;
647 if (currentChunkIndex < mutables.m_currentChunkSize)
649 Contract.Assert(m_source != null);
650 Contract.Assert(chunkBuffer != null);
651 Contract.Assert(mutables.m_currentChunkSize > 0);
652 Contract.Assert(0 <= currentChunkIndex && currentChunkIndex < chunkBuffer.Length);
653 currentElement = chunkBuffer[currentChunkIndex];
654 currentKey = mutables.m_chunkBaseIndex + currentChunkIndex;
659 // Else, it could be the first time enumerating this object, or we may have
660 // just reached the end of the current chunk and need to grab another one? In either
661 // case, we will look for more data from the underlying enumerator. Because we
662 // share the same enumerator object, we have to do this under a lock.
664 lock (m_sourceSyncLock)
666 Contract.Assert(0 <= mutables.m_nextChunkMaxSize && mutables.m_nextChunkMaxSize <= chunkBuffer.Length);
668 // Accumulate a chunk of elements from the input.
671 if (m_exceptionTracker.Value)
678 for (; i < mutables.m_nextChunkMaxSize && m_source.MoveNext(); i++)
680 // Read the current entry into our buffer.
681 chunkBuffer[i] = m_source.Current;
686 m_exceptionTracker.Value = true;
690 // Store the number of elements we fetched from the data source.
691 mutables.m_currentChunkSize = i;
693 // If we've emptied the enumerator, return immediately.
699 // Increment the shared index for all to see. Throw an exception on overflow.
700 mutables.m_chunkBaseIndex = m_currentIndex.Value;
703 m_currentIndex.Value += i;
707 // Each time we access the data source, we grow the chunk size for the next go-round.
708 // We grow the chunksize once per 'chunksPerChunkSize'.
709 if (mutables.m_nextChunkMaxSize < chunkBuffer.Length)
711 if ((mutables.m_chunkCounter++ & chunksPerChunkSize) == chunksPerChunkSize)
713 mutables.m_nextChunkMaxSize = mutables.m_nextChunkMaxSize * 2;
714 if (mutables.m_nextChunkMaxSize > chunkBuffer.Length)
716 mutables.m_nextChunkMaxSize = chunkBuffer.Length;
721 // Finally, reset our index to the beginning; loop around and we'll return the right values.
722 mutables.m_currentChunkIndex = -1;
726 protected override void Dispose(bool disposing)
728 if (Interlocked.Decrement(ref m_activeEnumeratorsCount.Value) == 0)