Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Partitioning / PartitionedDataSource.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // PartitionedDataSource.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// Contiguous range chunk partitioning attempts to improve data locality by keeping
22     /// data close together in the incoming data stream together in the outgoing partitions.
23     /// There are really three types of partitions that are used internally:
24     ///
25     ///     1. If the data source is indexable--like an array or List_T--we can actually
26     ///        just compute the range indexes and avoid doing any copying whatsoever. Each
27     ///        "partition" is just an enumerator that will walk some subset of the data.
28     ///     2. If the data source has an index (different than being indexable!), we can
29     ///        turn this into a range scan of the index. We can roughly estimate distribution
30     ///        and ensure an evenly balanced set of partitions.
31     ///     3. If we can't use 1 or 2, we instead partition "on demand" by chunking the contents
32     ///        of the source enumerator as they are requested. The unfortunate thing is that
33     ///        this requires synchronization, since consumers may be running in parallel. We
34     ///        amortize the cost of this by giving chunks of items when requested instead of
35     ///        one element at a time. Note that this approach also works for infinite streams.
36     ///
37     /// In all cases, the caller can request that enumerators walk elements in striped
38     /// contiguous chunks. If striping is requested, then each partition j will yield elements
39     /// in the data source for which ((i / s)%p) == j, where i is the element's index, s is
40     /// a chunk size calculated by the system with the intent of aligning on cache lines, and
41     /// p is the number of partitions. If striping is not requested, we use the same algorith,
42     /// only, instead of aligning on cache lines, we use a chunk size of l / p, where l
43     /// is the length of the input and p is the number of partitions.
44     ///
45     /// Notes:
46     ///     This is used as the default partitioning strategy by much of the PLINQ infrastructure.
47     /// </summary>
48     /// <typeparam name="T"></typeparam>
49     internal class PartitionedDataSource<T> : PartitionedStream<T, int>
50     {
51
52         //---------------------------------------------------------------------------------------
53         // Just constructs a new partition stream.
54         //
55
56         internal PartitionedDataSource(IEnumerable<T> source, int partitionCount, bool useStriping)
57             : base(
58                 partitionCount,
59                 Util.GetDefaultComparer<int>(), 
60                 source is IList<T> ? OrdinalIndexState.Indexible : OrdinalIndexState.Correct)
61         {
62             InitializePartitions(source, partitionCount, useStriping);
63         }
64
65         //---------------------------------------------------------------------------------------
66         // This method just creates the individual partitions given a data source.
67         // 
68         // Notes:
69         //     We check whether the data source is an IList<T> and, if so, we can partition
70         //     "in place" by calculating a set of indexes. Otherwise, we return an enumerator that
71         //     performs partitioning lazily. Depending on which case it is, the enumerator may
72         //     contain synchronization (i.e. the latter case), meaning callers may occ----ionally
73         //     block when enumerating it.
74         //
75
76         private void InitializePartitions(IEnumerable<T> source, int partitionCount, bool useStriping)
77         {
78             Contract.Assert(source != null);
79             Contract.Assert(partitionCount > 0);
80
81             // If this is a wrapper, grab the internal wrapped data source so we can uncover its real type.
82             ParallelEnumerableWrapper<T> wrapper = source as ParallelEnumerableWrapper<T>;
83             if (wrapper != null)
84             {
85                 source = wrapper.WrappedEnumerable;
86                 Contract.Assert(source != null);
87             }
88
89             // Check whether we have an indexable data source.
90             IList<T> sourceAsList = source as IList<T>;
91             if (sourceAsList != null)
92             {
93                 QueryOperatorEnumerator<T, int>[] partitions = new QueryOperatorEnumerator<T, int>[partitionCount];
94                 int listCount = sourceAsList.Count;
95
96                 // We use this below to specialize enumerators when possible.
97                 T[] sourceAsArray = source as T[];
98
99                 // If range partitioning is used, chunk size will be unlimited, i.e. -1.
100                 int maxChunkSize = -1;
101
102                 if (useStriping)
103                 {
104                     maxChunkSize = Scheduling.GetDefaultChunkSize<T>();
105
106                     // The minimum chunk size is 1.
107                     if (maxChunkSize < 1)
108                     {
109                         maxChunkSize = 1;
110                     }
111                 }
112
113                 // Calculate indexes and construct enumerators that walk a subset of the input.
114                 for (int i = 0; i < partitionCount; i++)
115                 {
116                     if (sourceAsArray != null)
117                     {
118                         // If the source is an array, we can use a fast path below to index using
119                         // 'ldelem' instructions rather than making interface method calls.
120                         if (useStriping)
121                         {
122                             partitions[i] = new ArrayIndexRangeEnumerator(sourceAsArray, partitionCount, i, maxChunkSize);
123                         }
124                         else
125                         {
126                             partitions[i] = new ArrayContiguousIndexRangeEnumerator(sourceAsArray, partitionCount, i);
127                         }
128                         TraceHelpers.TraceInfo("ContigousRangePartitionExchangeStream::MakePartitions - (array) #{0} {1}", i, maxChunkSize);
129                     }
130                     else
131                     {
132                         // Create a general purpose list enumerator object.
133                         if (useStriping)
134                         {
135                             partitions[i] = new ListIndexRangeEnumerator(sourceAsList, partitionCount, i, maxChunkSize);
136                         }
137                         else
138                         {
139                             partitions[i] = new ListContiguousIndexRangeEnumerator(sourceAsList, partitionCount, i);
140                         }
141                         TraceHelpers.TraceInfo("ContigousRangePartitionExchangeStream::MakePartitions - (list) #{0} {1})", i, maxChunkSize);
142                     }
143                 }
144
145                 Contract.Assert(partitions.Length == partitionCount);
146                 m_partitions = partitions;
147             }
148             else
149             {
150                 // We couldn't use an in-place partition. Shucks. Defer to the other overload which
151                 // accepts an enumerator as input instead.
152                 m_partitions = MakePartitions(source.GetEnumerator(), partitionCount);
153             }
154         }
155
156         //---------------------------------------------------------------------------------------
157         // This method just creates the individual partitions given a data source. See the base
158         // class for more details on this method's contracts. This version takes an enumerator,
159         // and so it can't actually do an in-place partition. We'll instead create enumerators
160         // that coordinate with one another to lazily (on demand) grab chunks from the enumerator.
161         // This clearly is much less efficient than the fast path above since it requires
162         // synchronization. We try to amortize that cost by retrieving many elements at once
163         // instead of just one-at-a-time.
164         //
165
166         private static QueryOperatorEnumerator<T, int>[] MakePartitions(IEnumerator<T> source, int partitionCount)
167         {
168             Contract.Assert(source != null);
169             Contract.Assert(partitionCount > 0);
170
171             // At this point we were unable to efficiently partition the data source. Instead, we
172             // will return enumerators that lazily partition the data source.
173             QueryOperatorEnumerator<T, int>[] partitions = new QueryOperatorEnumerator<T, int>[partitionCount];
174
175             // The following is used for synchronization between threads.
176             object sharedSyncLock = new object();
177             Shared<int> sharedCurrentIndex = new Shared<int>(0);
178             Shared<int> sharedPartitionCount = new Shared<int>(partitionCount);
179             Shared<bool> sharedExeceptionTracker = new Shared<bool>(false);
180
181             // Create a new lazy chunking enumerator per partition, sharing the same lock.
182             for (int i = 0; i < partitionCount; i++)
183             {
184                 partitions[i] = new ContiguousChunkLazyEnumerator(
185                     source, sharedExeceptionTracker, sharedSyncLock, sharedCurrentIndex, sharedPartitionCount);
186             }
187
188             return partitions;
189         }
190
191         //---------------------------------------------------------------------------------------
192         // This enumerator walks a range within an indexable data type. It's abstract. We assume
193         // callers have validated that the ranges are legal given the data. IndexRangeEnumerator
194         // handles both striped and range partitioning.
195         //
196         // PLINQ creates one IndexRangeEnumerator per partition. Together, the enumerators will
197         // cover the entire list or array.
198         //
199         // In this context, the term "range" represents the entire array or list. The range is
200         // split up into one or more "sections". Each section is split up into as many "chunks" as
201         // we have partitions. i-th chunk in each section is assigned to partition i.
202         //
203         // All sections but the last one contain partitionCount * maxChunkSize elements, except
204         // for the last section which may contain fewer.
205         //
206         // For example, if the input is an array with 2,101 elements, maxChunkSize is 128
207         // and partitionCount is 4, all sections except the last one will contain 128*4 = 512
208         // elements. The last section will contain 2,101 - 4*512 = 53 elements.
209         //
210         // All sections but the last one will be evenly divided among partitions: the first 128
211         // elements will go into partition 0, the next 128 elements into partition 1, etc.
212         //
213         // The last section is divided as evenly as possible. In the above example, the split would
214         // be 14-13-13-13.
215         //
216
217         // A copy of the index enumerator specialized for array indexing. It uses 'ldelem'
218         // instructions for element retrieval, rather than using a method call.
219         internal sealed class ArrayIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
220         {
221             private readonly T[] m_data; // The elements to iterate over.
222             private readonly int m_elementCount; // The number of elements to iterate over.
223             private readonly int m_partitionCount; // The number of partitions.
224             private readonly int m_partitionIndex; // The index of the current partition.
225             private readonly int m_maxChunkSize; // The maximum size of a chunk. -1 if unlimited.
226             private readonly int m_sectionCount; // Precomputed in ctor: the number of sections the range is split into.
227             private Mutables m_mutables; // Lazily allocated mutable variables.
228
229             class Mutables
230             {
231                 internal Mutables()
232                 {
233                     // Place the enumerator just before the first element
234                     m_currentSection = -1;
235                 }
236
237                 internal int m_currentSection; // 0-based index of the current section.
238                 internal int m_currentChunkSize; // The number of elements in the current chunk.
239                 internal int m_currentPositionInChunk; // 0-based position within the current chunk.
240                 internal int m_currentChunkOffset; // The offset of the current chunk from the beginning of the range.
241             }
242
243             internal ArrayIndexRangeEnumerator(T[] data, int partitionCount, int partitionIndex, int maxChunkSize)
244             {
245                 Contract.Assert(data != null, "data musn't be null");
246                 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
247                 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
248                 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
249                 Contract.Assert(maxChunkSize > 0, "maxChunkSize must be positive or -1");
250
251                 m_data = data;
252                 m_elementCount = data.Length;
253                 m_partitionCount = partitionCount;
254                 m_partitionIndex = partitionIndex;
255                 m_maxChunkSize = maxChunkSize;
256
257                 int sectionSize = maxChunkSize * partitionCount;
258                 Contract.Assert(sectionSize > 0);
259
260                 // Section count is ceiling(elementCount / sectionSize)
261                 m_sectionCount = m_elementCount / sectionSize +
262                     ((m_elementCount % sectionSize) == 0 ? 0 : 1);
263             }
264
265             internal override bool MoveNext(ref T currentElement, ref int currentKey)
266             {
267                 // Lazily allocate the mutable holder.
268                 Mutables mutables = m_mutables;
269                 if (mutables == null)
270                 {
271                     mutables = m_mutables = new Mutables();
272                 }
273
274                 // If we are aren't within the chunk, we need to find another.
275                 if (++mutables.m_currentPositionInChunk < mutables.m_currentChunkSize || MoveNextSlowPath())
276                 {
277                     currentKey = mutables.m_currentChunkOffset + mutables.m_currentPositionInChunk;
278                     currentElement = m_data[currentKey];
279                     return true;
280                 }
281
282                 return false;
283             }
284
285             private bool MoveNextSlowPath()
286             {
287                 Mutables mutables = m_mutables;
288                 Contract.Assert(mutables != null);
289                 Contract.Assert(mutables.m_currentPositionInChunk >= mutables.m_currentChunkSize);
290
291                 // Move on to the next section.
292                 int currentSection = ++mutables.m_currentSection;
293                 int sectionsRemaining = m_sectionCount - currentSection;
294
295                 // If empty, return right away.
296                 if (sectionsRemaining <= 0)
297                 {
298                     return false;
299                 }
300
301                 // Compute the offset of the current section from the beginning of the range
302                 int currentSectionOffset = currentSection * m_partitionCount * m_maxChunkSize;
303                 mutables.m_currentPositionInChunk = 0;
304
305                 // Now do something different based on how many chunks remain.
306                 if (sectionsRemaining > 1)
307                 {
308                     // We are not on the last section. The size of this chunk is simply m_maxChunkSize.
309                     mutables.m_currentChunkSize = m_maxChunkSize;
310                     mutables.m_currentChunkOffset = currentSectionOffset + m_partitionIndex * m_maxChunkSize;
311                 }
312                 else
313                 {
314                     // We are on the last section. Compute the size of the chunk to ensure even distribution
315                     // of elements.
316                     int lastSectionElementCount = m_elementCount - currentSectionOffset;
317                     int smallerChunkSize = lastSectionElementCount / m_partitionCount;
318                     int biggerChunkCount = lastSectionElementCount % m_partitionCount;
319
320                     mutables.m_currentChunkSize = smallerChunkSize;
321                     if (m_partitionIndex < biggerChunkCount)
322                     {
323                         mutables.m_currentChunkSize++;
324                     }
325                     if (mutables.m_currentChunkSize == 0)
326                     {
327                         return false;
328                     }
329
330                     mutables.m_currentChunkOffset =
331                         currentSectionOffset                            // The beginning of this section
332                         + m_partitionIndex * smallerChunkSize           // + the start of this chunk if all chunks were "smaller"
333                         + (m_partitionIndex < biggerChunkCount ? m_partitionIndex : biggerChunkCount); // + the number of "bigger" chunks before this chunk
334                 }
335
336                 return true;
337             }
338         }
339
340         // A contiguous index enumerator specialized for array indexing. It uses 'ldelem'
341         // instructions for element retrieval, rather than using a method call.
342         internal sealed class ArrayContiguousIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
343         {
344             private readonly T[] m_data; // The elements to iterate over.
345             private readonly int m_startIndex; // Where to begin iterating.
346             private readonly int m_maximumIndex; // The maximum index to iterate over.
347             private Shared<int> m_currentIndex; // The current index (lazily allocated).
348
349             internal ArrayContiguousIndexRangeEnumerator(T[] data, int partitionCount, int partitionIndex)
350             {
351                 Contract.Assert(data != null, "data musn't be null");
352                 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
353                 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
354                 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
355
356                 m_data = data;
357
358                 // Compute the size of the chunk to ensure even distribution of elements.
359                 int smallerChunkSize = data.Length / partitionCount;
360                 int biggerChunkCount = data.Length % partitionCount;
361
362                 // Our start index is our index times the small chunk size, plus the number
363                 // of "bigger" chunks before this one.
364                 int startIndex = partitionIndex * smallerChunkSize +
365                     (partitionIndex < biggerChunkCount ? partitionIndex : biggerChunkCount);
366
367                 m_startIndex = startIndex - 1; // Subtract one for the first call.
368                 m_maximumIndex = startIndex + smallerChunkSize + // And add one if we're responsible for part of the
369                     (partitionIndex < biggerChunkCount ? 1 : 0); // leftover chunks.
370
371                 Contract.Assert(m_currentIndex == null, "Expected deferred allocation to ensure it happens on correct thread");
372             }
373
374             internal override bool MoveNext(ref T currentElement, ref int currentKey)
375             {
376                 // Lazily allocate the current index if needed.
377                 if (m_currentIndex == null)
378                 {
379                     m_currentIndex = new Shared<int>(m_startIndex);
380                 }
381
382                 // Now increment the current index, check bounds, and so on.
383                 int current = ++m_currentIndex.Value;
384                 if (current < m_maximumIndex)
385                 {
386                     currentKey = current;
387                     currentElement = m_data[current];
388                     return true;
389                 }
390
391                 return false;
392             }
393         }
394
395         // A copy of the index enumerator specialized for IList<T> indexing. It calls through
396         // the IList<T> interface for element retrieval.
397         internal sealed class ListIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
398         {
399             private readonly IList<T> m_data; // The elements to iterate over.
400             private readonly int m_elementCount; // The number of elements to iterate over.
401             private readonly int m_partitionCount; // The number of partitions.
402             private readonly int m_partitionIndex; // The index of the current partition.
403             private readonly int m_maxChunkSize; // The maximum size of a chunk. -1 if unlimited.
404             private readonly int m_sectionCount; // Precomputed in ctor: the number of sections the range is split into.
405             private Mutables m_mutables; // Lazily allocated mutable variables.
406
407             class Mutables
408             {
409                 internal Mutables()
410                 {
411                     // Place the enumerator just before the first element
412                     m_currentSection = -1;
413                 }
414
415                 internal int m_currentSection; // 0-based index of the current section.
416                 internal int m_currentChunkSize; // The number of elements in the current chunk.
417                 internal int m_currentPositionInChunk; // 0-based position within the current chunk.
418                 internal int m_currentChunkOffset; // The offset of the current chunk from the beginning of the range.
419             }
420
421             internal ListIndexRangeEnumerator(IList<T> data, int partitionCount, int partitionIndex, int maxChunkSize)
422             {
423                 Contract.Assert(data != null, "data musn't be null");
424                 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
425                 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
426                 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
427                 Contract.Assert(maxChunkSize > 0, "maxChunkSize must be positive or -1");
428
429                 m_data = data;
430                 m_elementCount = data.Count;
431                 m_partitionCount = partitionCount;
432                 m_partitionIndex = partitionIndex;
433                 m_maxChunkSize = maxChunkSize;
434
435                 int sectionSize = maxChunkSize * partitionCount;
436                 Contract.Assert(sectionSize > 0);
437
438                 // Section count is ceiling(elementCount / sectionSize)
439                 m_sectionCount = m_elementCount / sectionSize +
440                     ((m_elementCount % sectionSize) == 0 ? 0 : 1);
441             }
442
443             internal override bool MoveNext(ref T currentElement, ref int currentKey)
444             {
445                 // Lazily allocate the mutable holder.
446                 Mutables mutables = m_mutables;
447                 if (mutables == null)
448                 {
449                     mutables = m_mutables = new Mutables();
450                 }
451
452                 // If we are aren't within the chunk, we need to find another.
453                 if (++mutables.m_currentPositionInChunk < mutables.m_currentChunkSize || MoveNextSlowPath())
454                 {
455                     currentKey = mutables.m_currentChunkOffset + mutables.m_currentPositionInChunk;
456                     currentElement = m_data[currentKey];
457                     return true;
458                 }
459
460                 return false;
461             }
462
463             private bool MoveNextSlowPath()
464             {
465                 Mutables mutables = m_mutables;
466                 Contract.Assert(mutables != null);
467                 Contract.Assert(mutables.m_currentPositionInChunk >= mutables.m_currentChunkSize);
468
469                 // Move on to the next section.
470                 int currentSection = ++mutables.m_currentSection;
471                 int sectionsRemaining = m_sectionCount - currentSection;
472
473                 // If empty, return right away.
474                 if (sectionsRemaining <= 0)
475                 {
476                     return false;
477                 }
478
479                 // Compute the offset of the current section from the beginning of the range
480                 int currentSectionOffset = currentSection * m_partitionCount * m_maxChunkSize;
481                 mutables.m_currentPositionInChunk = 0;
482
483                 // Now do something different based on how many chunks remain.
484                 if (sectionsRemaining > 1)
485                 {
486                     // We are not on the last section. The size of this chunk is simply m_maxChunkSize.
487                     mutables.m_currentChunkSize = m_maxChunkSize;
488                     mutables.m_currentChunkOffset = currentSectionOffset + m_partitionIndex * m_maxChunkSize;
489                 }
490                 else
491                 {
492                     // We are on the last section. Compute the size of the chunk to ensure even distribution
493                     // of elements.
494                     int lastSectionElementCount = m_elementCount - currentSectionOffset;
495                     int smallerChunkSize = lastSectionElementCount / m_partitionCount;
496                     int biggerChunkCount = lastSectionElementCount % m_partitionCount;
497
498                     mutables.m_currentChunkSize = smallerChunkSize;
499                     if (m_partitionIndex < biggerChunkCount)
500                     {
501                         mutables.m_currentChunkSize++;
502                     }
503                     if (mutables.m_currentChunkSize == 0)
504                     {
505                         return false;
506                     }
507
508                     mutables.m_currentChunkOffset =
509                         currentSectionOffset                            // The beginning of this section
510                         + m_partitionIndex * smallerChunkSize           // + the start of this chunk if all chunks were "smaller"
511                         + (m_partitionIndex < biggerChunkCount ? m_partitionIndex : biggerChunkCount); // + the number of "bigger" chunks before this chunk
512                 }
513
514                 return true;
515             }
516         }
517
518         // A contiguous index enumerator specialized for IList<T> indexing. It calls through
519         // the IList<T> interface for element retrieval.
520         internal sealed class ListContiguousIndexRangeEnumerator : QueryOperatorEnumerator<T, int>
521         {
522             private readonly IList<T> m_data; // The elements to iterate over.
523             private readonly int m_startIndex; // Where to begin iterating.
524             private readonly int m_maximumIndex; // The maximum index to iterate over.
525             private Shared<int> m_currentIndex; // The current index (lazily allocated).
526
527             internal ListContiguousIndexRangeEnumerator(IList<T> data, int partitionCount, int partitionIndex)
528             {
529                 Contract.Assert(data != null, "data musn't be null");
530                 Contract.Assert(partitionCount > 0, "partitionCount must be positive");
531                 Contract.Assert(partitionIndex >= 0, "partitionIndex can't be negative");
532                 Contract.Assert(partitionIndex < partitionCount, "partitionIndex must be less than partitionCount");
533
534                 m_data = data;
535
536                 // Compute the size of the chunk to ensure even distribution of elements.
537                 int smallerChunkSize = data.Count / partitionCount;
538                 int biggerChunkCount = data.Count % partitionCount;
539
540                 // Our start index is our index times the small chunk size, plus the number
541                 // of "bigger" chunks before this one.
542                 int startIndex = partitionIndex * smallerChunkSize +
543                     (partitionIndex < biggerChunkCount ? partitionIndex : biggerChunkCount);
544
545                 m_startIndex = startIndex - 1; // Subtract one for the first call.
546                 m_maximumIndex = startIndex + smallerChunkSize + // And add one if we're responsible for part of the
547                     (partitionIndex < biggerChunkCount ? 1 : 0); // leftover chunks.
548
549                 Contract.Assert(m_currentIndex == null, "Expected deferred allocation to ensure it happens on correct thread");
550             }
551
552             internal override bool MoveNext(ref T currentElement, ref int currentKey)
553             {
554                 // Lazily allocate the current index if needed.
555                 if (m_currentIndex == null)
556                 {
557                     m_currentIndex = new Shared<int>(m_startIndex);
558                 }
559
560                 // Now increment the current index, check bounds, and so on.
561                 int current = ++m_currentIndex.Value;
562                 if (current < m_maximumIndex)
563                 {
564                     currentKey = current;
565                     currentElement = m_data[current];
566                     return true;
567                 }
568
569                 return false;
570             }
571         }
572
573         //---------------------------------------------------------------------------------------
574         // This enumerator lazily grabs chunks of data from the underlying data source. It is
575         // safe for this data source to be enumerated by multiple such enumerators, since it has
576         // been written to perform proper synchronization.
577         //
578
579         private class ContiguousChunkLazyEnumerator : QueryOperatorEnumerator<T, int>
580         {
581             private const int chunksPerChunkSize = 7; // the rate at which to double the chunksize (double chunksize every 'r' chunks). MUST BE == (2^n)-1 for some n.
582             private readonly IEnumerator<T> m_source; // Data source to enumerate.
583             private readonly object m_sourceSyncLock; // Lock to use for all synchronization.
584             private readonly Shared<int> m_currentIndex; // The index shared by all.
585             private readonly Shared<int> m_activeEnumeratorsCount; // How many enumerators over the same source have not been disposed yet?
586             private readonly Shared<bool> m_exceptionTracker;
587             private Mutables m_mutables; // Any mutable fields on this enumerator. These mutables are local and persistent
588             
589             class Mutables
590             {
591                 internal Mutables()
592                 {
593                     m_nextChunkMaxSize = 1; // We start the chunk size at 1 and grow it later.
594                     m_chunkBuffer = new T[Scheduling.GetDefaultChunkSize<T>()]; // Pre-allocate the array at the maximum size.
595                     m_currentChunkSize = 0; // The chunk begins life begins empty.
596                     m_currentChunkIndex = -1;
597                     m_chunkBaseIndex = 0;
598                     m_chunkCounter = 0;
599                 }
600
601                 internal readonly T[] m_chunkBuffer;      // Buffer array for the current chunk being enumerated.
602                 internal int m_nextChunkMaxSize;  // The max. chunk size to use for the next chunk.
603                 internal int m_currentChunkSize;  // The element count for our current chunk.
604                 internal int m_currentChunkIndex; // Our current index within the temporary chunk.
605                 internal int m_chunkBaseIndex;    // The start index from which the current chunk was taken.
606                 internal int  m_chunkCounter;  
607             }
608
609             //---------------------------------------------------------------------------------------
610             // Constructs a new enumerator that lazily retrieves chunks from the provided source.
611             //
612
613             internal ContiguousChunkLazyEnumerator(
614                 IEnumerator<T> source, Shared<bool> exceptionTracker, object sourceSyncLock, Shared<int> currentIndex, Shared<int> degreeOfParallelism)
615             {
616                 Contract.Assert(source != null);
617                 Contract.Assert(sourceSyncLock != null);
618                 Contract.Assert(currentIndex != null);
619
620                 m_source = source;
621                 m_sourceSyncLock = sourceSyncLock;
622                 m_currentIndex = currentIndex;
623                 m_activeEnumeratorsCount = degreeOfParallelism;
624                 m_exceptionTracker = exceptionTracker;    
625             }
626
627             //---------------------------------------------------------------------------------------
628             // Just retrieves the current element from our current chunk.
629             //
630
631             internal override bool MoveNext(ref T currentElement, ref int currentKey)
632             {
633                 Mutables mutables = m_mutables;
634                 if (mutables == null)
635                 {
636                     mutables = m_mutables = new Mutables();
637                 }
638
639                 Contract.Assert(mutables.m_chunkBuffer != null);
640
641                 // Loop until we've exhausted our data source.
642                 while (true)
643                 {
644                     // If we have elements remaining in the current chunk, return right away.
645                     T[] chunkBuffer = mutables.m_chunkBuffer;
646                     int currentChunkIndex = ++mutables.m_currentChunkIndex;
647                     if (currentChunkIndex < mutables.m_currentChunkSize)
648                     {
649                         Contract.Assert(m_source != null);
650                         Contract.Assert(chunkBuffer != null);
651                         Contract.Assert(mutables.m_currentChunkSize > 0);
652                         Contract.Assert(0 <= currentChunkIndex && currentChunkIndex < chunkBuffer.Length);
653                         currentElement = chunkBuffer[currentChunkIndex];
654                         currentKey = mutables.m_chunkBaseIndex + currentChunkIndex;
655
656                         return true;
657                     }
658
659                     // Else, it could be the first time enumerating this object, or we may have
660                     // just reached the end of the current chunk and need to grab another one? In either
661                     // case, we will look for more data from the underlying enumerator.  Because we
662                     // share the same enumerator object, we have to do this under a lock.
663
664                     lock (m_sourceSyncLock)
665                     {
666                         Contract.Assert(0 <= mutables.m_nextChunkMaxSize && mutables.m_nextChunkMaxSize <= chunkBuffer.Length);
667
668                         // Accumulate a chunk of elements from the input.
669                         int i = 0;
670
671                         if (m_exceptionTracker.Value)
672                         {
673                             return false;
674                         }
675
676                         try
677                         {
678                             for (; i < mutables.m_nextChunkMaxSize && m_source.MoveNext(); i++)
679                             {
680                                 // Read the current entry into our buffer.
681                                 chunkBuffer[i] = m_source.Current;
682                             }
683                         }
684                         catch 
685                         {
686                             m_exceptionTracker.Value = true;
687                             throw;
688                         }
689
690                         // Store the number of elements we fetched from the data source.
691                         mutables.m_currentChunkSize = i;
692
693                         // If we've emptied the enumerator, return immediately.
694                         if (i == 0)
695                         {
696                             return false;
697                         }
698
699                         // Increment the shared index for all to see. Throw an exception on overflow.
700                         mutables.m_chunkBaseIndex = m_currentIndex.Value;
701                         checked
702                         {
703                             m_currentIndex.Value += i;
704                         }
705                     }
706
707                     // Each time we access the data source, we grow the chunk size for the next go-round.
708                     // We grow the chunksize once per 'chunksPerChunkSize'. 
709                     if (mutables.m_nextChunkMaxSize < chunkBuffer.Length)
710                     {
711                         if ((mutables.m_chunkCounter++ & chunksPerChunkSize) == chunksPerChunkSize)
712                         {
713                             mutables.m_nextChunkMaxSize = mutables.m_nextChunkMaxSize * 2;
714                             if (mutables.m_nextChunkMaxSize > chunkBuffer.Length)
715                             {
716                                 mutables.m_nextChunkMaxSize = chunkBuffer.Length;
717                             }
718                         }
719                     }
720
721                     // Finally, reset our index to the beginning; loop around and we'll return the right values.
722                     mutables.m_currentChunkIndex = -1;
723                 }
724             }
725
726             protected override void Dispose(bool disposing)
727             {
728                 if (Interlocked.Decrement(ref m_activeEnumeratorsCount.Value) == 0)
729                 {
730                     m_source.Dispose();
731                 }
732             }
733         }
734
735     }
736 }