232f75d54d803bb0f936b1a039d823fc108018b2
[mono.git] / mcs / class / referencesource / mscorlib / system / collections / Concurrent / PartitionerStatic.cs
1 #pragma warning disable 0420
2 // ==++==
3 //
4 //   Copyright (c) Microsoft Corporation.  All rights reserved.
5 // 
6 // ==--==
7 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 //
9 // PartitionerStatic.cs
10 //
11 // <OWNER>Microsoft</OWNER>
12 //
13 // A class of default partitioners for Partitioner<TSource>
14 //
15 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
16
17 using System.Collections.Generic;
18 using System.Security.Permissions;
19 using System.Threading;
20 using System.Diagnostics.Contracts;
21 using System.Runtime.InteropServices;
22
23 namespace System.Collections.Concurrent
24 {
25     /// <summary>
26     /// Out-of-the-box partitioners are created with a set of default behaviors.  
27     /// For example, by default, some form of buffering and chunking will be employed to achieve 
28     /// optimal performance in the common scenario where an IEnumerable<> implementation is fast and 
29     /// non-blocking.  These behaviors can be overridden via this enumeration.
30     /// </summary>
31     [Flags]
32 #if !FEATURE_CORECLR
33     [Serializable]
34 #endif
35     public enum EnumerablePartitionerOptions
36     {
37         /// <summary>
38         /// Use the default behavior (i.e., use buffering to achieve optimal performance)
39         /// </summary>
40         None = 0x0,
41
42         /// <summary>
43         /// Creates a partitioner that will take items from the source enumerable one at a time
44         /// and will not use intermediate storage that can be accessed more efficiently by multiple threads.  
45         /// This option provides support for low latency (items will be processed as soon as they are available from 
46         /// the source) and partial support for dependencies between items (a thread cannot deadlock waiting for an item 
47         /// that it, itself, is responsible for processing).
48         /// </summary>
49         NoBuffering = 0x1
50     }
51
52     // The static class Partitioners implements 3 default partitioning strategies:
53     // 1. dynamic load balance partitioning for indexable data source (IList and arrays)
54     // 2. static partitioning for indexable data source (IList and arrays)
55     // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
56     //    of elements, but enuemrators are not indexable 
57     // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
58     //   We assume that the source data of IList/Array is not changing concurrently.
59     // - data source of type IEnumerable can only be partitioned dynamically (load-balance)
60     // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the 
61     //   implementation is different for data source of IList/Array vs. IEnumerable:
62     //   * When the source collection is IList/Arrays, we use Interlocked on the shared index; 
63     //   * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source 
64     //     enumerator.
65
66     /// <summary>
67     /// Provides common partitioning strategies for arrays, lists, and enumerables.
68     /// </summary>
69     /// <remarks>
70     /// <para>
71     /// The static methods on <see cref="Partitioner"/> are all thread-safe and may be used concurrently
72     /// from multiple threads. However, while a created partitioner is in use, the underlying data source
73     /// should not be modified, whether from the same thread that's using a partitioner or from a separate
74     /// thread.
75     /// </para>
76     /// </remarks>
77     [HostProtection(Synchronization = true, ExternalThreading = true)]
78     public static class Partitioner
79     {
80         /// <summary>
81         /// Creates an orderable partitioner from an <see cref="System.Collections.Generic.IList{T}"/>
82         /// instance.
83         /// </summary>
84         /// <typeparam name="TSource">Type of the elements in source list.</typeparam>
85         /// <param name="list">The list to be partitioned.</param>
86         /// <param name="loadBalance">
87         /// A Boolean value that indicates whether the created partitioner should dynamically
88         /// load balance between partitions rather than statically partition.
89         /// </param>
90         /// <returns>
91         /// An orderable partitioner based on the input list.
92         /// </returns>
93         public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance)
94         {
95             if (list == null)
96             {
97                 throw new ArgumentNullException("list");
98             }
99             if (loadBalance)
100             {
101                 return (new DynamicPartitionerForIList<TSource>(list));
102             }
103             else
104             {
105                 return (new StaticIndexRangePartitionerForIList<TSource>(list));
106             }
107         }
108
109         /// <summary>
110         /// Creates an orderable partitioner from a <see cref="System.Array"/> instance.
111         /// </summary>
112         /// <typeparam name="TSource">Type of the elements in source array.</typeparam>
113         /// <param name="array">The array to be partitioned.</param>
114         /// <param name="loadBalance">
115         /// A Boolean value that indicates whether the created partitioner should dynamically load balance
116         /// between partitions rather than statically partition.
117         /// </param>
118         /// <returns>
119         /// An orderable partitioner based on the input array.
120         /// </returns>
121         public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance)
122         {
123             // This implementation uses 'ldelem' instructions for element retrieval, rather than using a
124             // method call.
125
126             if (array == null)
127             {
128                 throw new ArgumentNullException("array");
129             }
130             if (loadBalance)
131             {
132                 return (new DynamicPartitionerForArray<TSource>(array));
133             }
134             else
135             {
136                 return (new StaticIndexRangePartitionerForArray<TSource>(array));
137             }
138         }
139
140         /// <summary>
141         /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
142         /// </summary>
143         /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
144         /// <param name="source">The enumerable to be partitioned.</param>
145         /// <returns>
146         /// An orderable partitioner based on the input array.
147         /// </returns>
148         /// <remarks>
149         /// The ordering used in the created partitioner is determined by the natural order of the elements 
150         /// as retrieved from the source enumerable.
151         /// </remarks>
152         public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source)
153         {
154             return Create<TSource>(source, EnumerablePartitionerOptions.None);
155         }
156
157         /// <summary>
158         /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
159         /// </summary>
160         /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
161         /// <param name="source">The enumerable to be partitioned.</param>
162         /// <param name="partitionerOptions">Options to control the buffering behavior of the partitioner.</param>
163         /// <exception cref="T:System.ArgumentOutOfRangeException">
164         /// The <paramref name="partitionerOptions"/> argument specifies an invalid value for <see
165         /// cref="T:System.Collections.Concurrent.EnumerablePartitionerOptions"/>.
166         /// </exception>
167         /// <returns>
168         /// An orderable partitioner based on the input array.
169         /// </returns>
170         /// <remarks>
171         /// The ordering used in the created partitioner is determined by the natural order of the elements 
172         /// as retrieved from the source enumerable.
173         /// </remarks>
174         public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
175         {
176             if (source == null)
177             {
178                 throw new ArgumentNullException("source");
179             }
180
181             if ((partitionerOptions & (~EnumerablePartitionerOptions.NoBuffering)) != 0)
182                 throw new ArgumentOutOfRangeException("partitionerOptions");
183
184             return (new DynamicPartitionerForIEnumerable<TSource>(source, partitionerOptions));
185         }
186
187 #if !PFX_LEGACY_3_5
188         /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
189         /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
190         /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
191         /// <returns>A partitioner.</returns>
192         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is 
193         /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
194         public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive)
195         {
196             // How many chunks do we want to divide the range into?  If this is 1, then the
197             // answer is "one chunk per core".  Generally, though, you'll achieve better
198             // load balancing on a busy system if you make it higher than 1.
199             int coreOversubscriptionRate = 3;
200
201             if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
202             long rangeSize = (toExclusive - fromInclusive) /
203                 (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
204             if (rangeSize == 0) rangeSize = 1;
205             return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
206         }
207
208         /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
209         /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
210         /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
211         /// <param name="rangeSize">The size of each subrange.</param>
212         /// <returns>A partitioner.</returns>
213         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is 
214         /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
215         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is 
216         /// less than or equal to 0.</exception>
217         public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize)
218         {
219             if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
220             if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
221             return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
222         }
223
224         // Private method to parcel out range tuples.
225         private static IEnumerable<Tuple<long, long>> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
226         {
227             // Enumerate all of the ranges
228             long from, to;
229             bool shouldQuit = false;
230
231             for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
232             {
233                 from = i;
234                 try { checked { to = i + rangeSize; } }
235                 catch (OverflowException)
236                 {
237                     to = toExclusive;
238                     shouldQuit = true;
239                 }
240                 if (to > toExclusive) to = toExclusive;
241                 yield return new Tuple<long, long>(from, to);
242             }
243         }
244
245         /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
246         /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
247         /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
248         /// <returns>A partitioner.</returns>
249         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is 
250         /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
251         public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
252         {
253             // How many chunks do we want to divide the range into?  If this is 1, then the
254             // answer is "one chunk per core".  Generally, though, you'll achieve better
255             // load balancing on a busy system if you make it higher than 1.
256             int coreOversubscriptionRate = 3;
257
258             if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
259             int rangeSize = (toExclusive - fromInclusive) /
260                 (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
261             if (rangeSize == 0) rangeSize = 1;
262             return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
263         }
264
265         /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
266         /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
267         /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
268         /// <param name="rangeSize">The size of each subrange.</param>
269         /// <returns>A partitioner.</returns>
270         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is 
271         /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
272         /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is 
273         /// less than or equal to 0.</exception>
274         public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize)
275         {
276             if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
277             if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
278             return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
279         }
280
281         // Private method to parcel out range tuples.
282         private static IEnumerable<Tuple<int, int>> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
283         {
284             // Enumerate all of the ranges
285             int from, to;
286             bool shouldQuit = false;
287
288             for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
289             {
290                 from = i;
291                 try { checked { to = i + rangeSize; } }
292                 catch (OverflowException)
293                 {
294                     to = toExclusive;
295                     shouldQuit = true;
296                 }
297                 if (to > toExclusive) to = toExclusive;
298                 yield return new Tuple<int, int>(from, to);
299             }
300         }
301 #endif
302
303         #region DynamicPartitionEnumerator_Abstract class
304         /// <summary>
305         /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance
306         /// partitioning algorithm. 
307         /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source: 
308         ///   the key is the index in the source collection; the value is the item itself.
309         /// - a set of such partitions share a reader over data source. The type of the reader is specified by
310         ///   TSourceReader. 
311         /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk 
312         ///   size is initially 1, and doubles every time until it reaches the maximum chunk size. 
313         ///   The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange 
314         ///   types (IList and the array), one for data source of IEnumerable.
315         /// - The method "Reset" is not supported for any partitioning algorithm.
316         /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it
317         ///   in this abstract class.
318         /// </summary>
319         /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
320         /// <typeparam name="TSourceReader">Type of the reader on the data source</typeparam>
321         //TSourceReader is 
322         //  - IList<TSource>, when source data is IList<TSource>, the shared reader is source data itself
323         //  - TSource[], when source data is TSource[], the shared reader is source data itself
324         //  - IEnumerator<TSource>, when source data is IEnumerable<TSource>, and the shared reader is an 
325         //    enumerator of the source data
326         private abstract class DynamicPartitionEnumerator_Abstract<TSource, TSourceReader> : IEnumerator<KeyValuePair<long, TSource>>
327         {
328             //----------------- common fields and constructor for all dynamic partitioners -----------------
329             //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
330             protected readonly TSourceReader m_sharedReader;
331
332             protected static int s_defaultMaxChunkSize = GetDefaultChunkSize<TSource>();
333
334             //deferred allocating in MoveNext() with initial value 0, to avoid false sharing
335             //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator 
336             protected SharedInt m_currentChunkSize;
337
338             //deferring allocation in MoveNext() with initial value -1, to avoid false sharing
339             protected SharedInt m_localOffset;
340
341             private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs
342             private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
343             protected readonly int m_maxChunkSize; // s_defaultMaxChunkSize unless single-chunking is requested by the caller
344
345             // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
346             // it serves as tracking of the natual order of elements in m_sharedReader
347             // the value of this field is passed in from outside (already initialized) by the constructor, 
348             protected readonly SharedLong m_sharedIndex;
349
350             protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
351                 : this(sharedReader, sharedIndex, false)
352             {
353             }
354
355             protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex, bool useSingleChunking)
356             {
357                 m_sharedReader = sharedReader;
358                 m_sharedIndex = sharedIndex;
359                 m_maxChunkSize = useSingleChunking ? 1 : s_defaultMaxChunkSize;
360             }
361
362             // ---------------- abstract method declarations --------------
363
364             /// <summary>
365             /// Abstract method to request a contiguous chunk of elements from the source collection
366             /// </summary>
367             /// <param name="requestedChunkSize">specified number of elements requested</param>
368             /// <returns>
369             /// true if we successfully reserved at least one element (up to #=requestedChunkSize) 
370             /// false if all elements in the source collection have been reserved.
371             /// </returns>
372             //GrabNextChunk does the following: 
373             //  - grab # of requestedChunkSize elements from source data through shared reader, 
374             //  - at the time of function returns, m_currentChunkSize is updated with the number of 
375             //    elements actually got ----gined (<=requestedChunkSize). 
376             //  - GrabNextChunk returns true if at least one element is assigned to this partition; 
377             //    false if the shared reader already hits the last element of the source data before 
378             //    we call GrabNextChunk
379             protected abstract bool GrabNextChunk(int requestedChunkSize);
380
381             /// <summary>
382             /// Abstract property, returns whether or not the shared reader has already read the last 
383             /// element of the source data 
384             /// </summary>
385             protected abstract bool HasNoElementsLeft { get; set; }
386
387             /// <summary>
388             /// Get the current element in the current partition. Property required by IEnumerator interface
389             /// This property is abstract because the implementation is different depending on the type
390             /// of the source data: IList, Array or IEnumerable
391             /// </summary>
392             public abstract KeyValuePair<long, TSource> Current { get; }
393
394             /// <summary>
395             /// Dispose is abstract, and depends on the type of the source data:
396             /// - For source data type IList and Array, the type of the shared reader is just the dataitself.
397             ///   We don't do anything in Dispose method for IList and Array. 
398             /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
399             ///   Thus we need to dispose this shared reader enumerator, when there is no more active partitions
400             ///   left.
401             /// </summary>
402             public abstract void Dispose();
403
404             /// <summary>
405             /// Reset on partitions is not supported
406             /// </summary>
407             public void Reset()
408             {
409                 throw new NotSupportedException();
410             }
411
412
413             /// <summary>
414             /// Get the current element in the current partition. Property required by IEnumerator interface
415             /// </summary>
416             Object IEnumerator.Current
417             {
418                 get
419                 {
420                     return ((DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>)this).Current;
421                 }
422             }
423
424             /// <summary>
425             /// Moves to the next element if any.
426             /// Try current chunk first, if the current chunk do not have any elements left, then we 
427             /// attempt to grab a chunk from the source collection.
428             /// </summary>
429             /// <returns>
430             /// true if successfully moving to the next position;
431             /// false otherwise, if and only if there is no more elements left in the current chunk 
432             /// AND the source collection is exhausted. 
433             /// </returns>
434             public bool MoveNext()
435             {
436                 //perform deferred allocating of the local variables. 
437                 if (m_localOffset == null)
438                 {
439                     Contract.Assert(m_currentChunkSize == null);
440                     m_localOffset = new SharedInt(-1);
441                     m_currentChunkSize = new SharedInt(0);
442                     m_doublingCountdown = CHUNK_DOUBLING_RATE;
443                 }
444
445                 if (m_localOffset.Value < m_currentChunkSize.Value - 1)
446                 //attempt to grab the next element from the local chunk
447                 {
448                     m_localOffset.Value++;
449                     return true;
450                 }
451                 else
452                 //otherwise it means we exhausted the local chunk
453                 //grab a new chunk from the source enumerator
454                 {
455                     // The second part of the || condition is necessary to handle the case when MoveNext() is called
456                     // after a previous MoveNext call returned false.
457                     Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1 || m_currentChunkSize.Value == 0);
458
459                     //set the requested chunk size to a proper value
460                     int requestedChunkSize;
461                     if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator
462                     {
463                         requestedChunkSize = 1;
464                     }
465                     else if (m_doublingCountdown > 0)
466                     {
467                         requestedChunkSize = m_currentChunkSize.Value;
468                     }
469                     else
470                     {
471                         requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
472                         m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset
473                     }
474
475                     // Decrement your doubling countdown
476                     m_doublingCountdown--;
477
478                     Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize);
479                     //GrabNextChunk will update the value of m_currentChunkSize
480                     if (GrabNextChunk(requestedChunkSize))
481                     {
482                         Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
483                         m_localOffset.Value = 0;
484                         return true;
485                     }
486                     else
487                     {
488                         return false;
489                     }
490                 }
491             }
492         }
493         #endregion
494
495         #region Dynamic Partitioner for source data of IEnuemrable<> type
496         /// <summary>
497         /// Inherits from DynamicPartitioners
498         /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
499         /// of EnumerableOfPartitionsForIEnumerator defined internally
500         /// </summary>
501         /// <typeparam name="TSource">Type of elements in the source data</typeparam>
502         private class DynamicPartitionerForIEnumerable<TSource> : OrderablePartitioner<TSource>
503         {
504             IEnumerable<TSource> m_source;
505             readonly bool m_useSingleChunking;
506
507             //constructor
508             internal DynamicPartitionerForIEnumerable(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
509                 : base(true, false, true)
510             {
511                 m_source = source;
512                 m_useSingleChunking = ((partitionerOptions & EnumerablePartitionerOptions.NoBuffering) != 0);
513             }
514
515             /// <summary>
516             /// Overrides OrderablePartitioner.GetOrderablePartitions.
517             /// Partitions the underlying collection into the given number of orderable partitions.
518             /// </summary>
519             /// <param name="partitionCount">number of partitions requested</param>
520             /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
521             override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
522             {
523                 if (partitionCount <= 0)
524                 {
525                     throw new ArgumentOutOfRangeException("partitionCount");
526                 }
527                 IEnumerator<KeyValuePair<long, TSource>>[] partitions
528                     = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
529
530                 IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, true);
531                 for (int i = 0; i < partitionCount; i++)
532                 {
533                     partitions[i] = partitionEnumerable.GetEnumerator();
534                 }
535                 return partitions;
536             }
537
538             /// <summary>
539             /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
540             /// </summary>
541             /// <returns>a enumerable collection of orderable partitions</returns>
542             override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
543             {
544                 return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, false);
545             }
546
547             /// <summary>
548             /// Whether additional partitions can be created dynamically.
549             /// </summary>
550             override public bool SupportsDynamicPartitions
551             {
552                 get { return true; }
553             }
554
555             #region Internal classes:  InternalPartitionEnumerable, InternalPartitionEnumerator
556             /// <summary>
557             /// Provides customized implementation for source data of IEnumerable
558             /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
559             /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a 
560             /// shared count "m_activePartitionCount" used to track active partitions when they were created statically
561             /// </summary>
562             private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>, IDisposable
563             {
564                 //reader through which we access the source data
565                 private readonly IEnumerator<TSource> m_sharedReader;
566                 private SharedLong m_sharedIndex;//initial value -1
567
568                 private volatile KeyValuePair<long, TSource>[] m_FillBuffer;  // intermediate buffer to reduce locking
569                 private volatile int m_FillBufferSize;               // actual number of elements in m_FillBuffer. Will start
570                                                                     // at m_FillBuffer.Length, and might be reduced during the last refill
571                 private volatile int m_FillBufferCurrentPosition;    //shared value to be accessed by Interlock.Increment only
572                 private volatile int m_activeCopiers;               //number of active copiers
573
574                 //fields shared by all partitions that this Enumerable owns, their allocation is deferred
575                 private SharedBool m_hasNoElementsLeft; // no elements left at all.
576                 private SharedBool m_sourceDepleted;    // no elements left in the enumerator, but there may be elements in the Fill Buffer
577
578                 //shared synchronization lock, created by this Enumerable
579                 private object m_sharedLock;//deferring allocation by enumerator
580
581                 private bool m_disposed;
582
583                 // If dynamic partitioning, then m_activePartitionCount == null
584                 // If static partitioning, then it keeps track of active partition count
585                 private SharedInt m_activePartitionCount;
586
587                 // records whether or not the user has requested single-chunking behavior
588                 private readonly bool m_useSingleChunking;
589
590                 internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning)
591                 {
592                     m_sharedReader = sharedReader;
593                     m_sharedIndex = new SharedLong(-1);
594                     m_hasNoElementsLeft = new SharedBool(false);
595                     m_sourceDepleted = new SharedBool(false);
596                     m_sharedLock = new object();
597                     m_useSingleChunking = useSingleChunking;
598
599                     // Only allocate the fill-buffer if single-chunking is not in effect
600                     if (!m_useSingleChunking)
601                     {
602                         // Time to allocate the fill buffer which is used to reduce the contention on the shared lock.
603                         // First pick the buffer size multiplier. We use 4 for when there are more than 4 cores, and just 1 for below. This is based on empirical evidence.
604                         int fillBufferMultiplier = (PlatformHelper.ProcessorCount > 4) ? 4 : 1;
605
606                         // and allocate the fill buffer using these two numbers
607                         m_FillBuffer = new KeyValuePair<long, TSource>[fillBufferMultiplier * Partitioner.GetDefaultChunkSize<TSource>()];
608                     }
609
610                     if (isStaticPartitioning)
611                     {
612                         // If this object is created for static partitioning (ie. via GetPartitions(int partitionCount), 
613                         // GetOrderablePartitions(int partitionCount)), we track the active partitions, in order to dispose 
614                         // this object when all the partitions have been disposed.
615                         m_activePartitionCount = new SharedInt(0);
616                     }
617                     else
618                     {
619                         // Otherwise this object is created for dynamic partitioning (ie, via GetDynamicPartitions(),
620                         // GetOrderableDynamicPartitions()), we do not need tracking. This object must be disposed
621                         // explicitly
622                         m_activePartitionCount = null;
623                     }
624                 }
625
626                 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
627                 {
628                     if (m_disposed)
629                     {
630                         throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
631                     }
632                     else
633                     {
634                         return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex,
635                             m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_useSingleChunking);
636                     }
637                 }
638
639
640                 IEnumerator IEnumerable.GetEnumerator()
641                 {
642                     return ((InternalPartitionEnumerable)this).GetEnumerator();
643                 }
644
645
646                 ///////////////////
647                 //
648                 // Used by GrabChunk_Buffered()
649                 private void TryCopyFromFillBuffer(KeyValuePair<long, TSource>[] destArray, 
650                                                   int requestedChunkSize, 
651                                                   ref int actualNumElementsGrabbed)
652                 {                    
653                     actualNumElementsGrabbed = 0;
654
655
656                     // making a local defensive copy of the fill buffer reference, just in case it gets nulled out
657                     KeyValuePair<long, TSource>[] fillBufferLocalRef = m_FillBuffer;
658                     if (fillBufferLocalRef == null) return;
659
660                     // first do a quick check, and give up if the current position is at the end
661                     // so that we don't do an unncessary pair of Interlocked.Increment / Decrement calls
662                     if (m_FillBufferCurrentPosition >= m_FillBufferSize)
663                     {                        
664                         return; // no elements in the buffer to copy from
665                     }
666
667                     // We might have a chance to grab elements from the buffer. We will know for sure 
668                     // when we do the Interlocked.Add below. 
669                     // But first we must register as a potential copier in order to make sure 
670                     // the elements we're copying from don't get overwritten by another thread 
671                     // that starts refilling the buffer right after our Interlocked.Add.
672                     Interlocked.Increment(ref m_activeCopiers);
673
674                     int endPos = Interlocked.Add(ref m_FillBufferCurrentPosition, requestedChunkSize);
675                     int beginPos = endPos - requestedChunkSize;
676
677                     if (beginPos < m_FillBufferSize)
678                     {
679                         // adjust index and do the actual copy
680                         actualNumElementsGrabbed = (endPos < m_FillBufferSize) ? endPos : m_FillBufferSize - beginPos;
681                         Array.Copy(fillBufferLocalRef, beginPos, destArray, 0, actualNumElementsGrabbed);
682                     }
683
684                     // let the record show we are no longer accessing the buffer
685                     Interlocked.Decrement(ref m_activeCopiers);
686                 }
687
688                 /// <summary>
689                 /// This is the common entry point for consuming items from the source enumerable
690                 /// </summary>
691                 /// <returns>
692                 /// true if we successfully reserved at least one element 
693                 /// false if all elements in the source collection have been reserved.
694                 /// </returns>
695                 internal bool GrabChunk(KeyValuePair<long, TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
696                 {
697                     actualNumElementsGrabbed = 0;
698
699                     if (m_hasNoElementsLeft.Value)
700                     {
701                         return false;
702                     }
703
704                     if (m_useSingleChunking)
705                     {
706                         return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
707                     }
708                     else
709                     {
710                         return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
711                     }
712                 }
713
714                 /// <summary>
715                 /// Version of GrabChunk that grabs a single element at a time from the source enumerable
716                 /// </summary>
717                 /// <returns>
718                 /// true if we successfully reserved an element 
719                 /// false if all elements in the source collection have been reserved.
720                 /// </returns>
721                 internal bool GrabChunk_Single(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
722                 {
723                     Contract.Assert(m_useSingleChunking, "Expected m_useSingleChecking to be true");
724                     Contract.Assert(requestedChunkSize == 1, "Got requested chunk size of " + requestedChunkSize + " when single-chunking was on");
725                     Contract.Assert(actualNumElementsGrabbed == 0, "Expected actualNumElementsGrabbed == 0, instead it is " + actualNumElementsGrabbed);
726                     Contract.Assert(destArray.Length == 1, "Expected destArray to be of length 1, instead its length is " + destArray.Length);
727
728                     lock (m_sharedLock)
729                     {
730                         if (m_hasNoElementsLeft.Value) return false;
731
732                         try
733                         {
734                             if (m_sharedReader.MoveNext())
735                             {
736                                 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
737                                 destArray[0]
738                                     = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
739                                                                         m_sharedReader.Current);
740                                 actualNumElementsGrabbed = 1;
741                                 return true;
742                             }
743                             else
744                             {
745                                 //if MoveNext() return false, we set the flag to inform other partitions
746                                 m_sourceDepleted.Value = true;
747                                 m_hasNoElementsLeft.Value = true;
748                                 return false;
749                             }
750                         }
751                         catch
752                         {
753                             // On an exception, make sure that no additional items are hereafter enumerated
754                             m_sourceDepleted.Value = true;
755                             m_hasNoElementsLeft.Value = true;
756                             throw;
757                         }
758                     }
759                 }
760
761
762
763                 /// <summary>
764                 /// Version of GrabChunk that uses buffering scheme to grab items out of source enumerable
765                 /// </summary>
766                 /// <returns>
767                 /// true if we successfully reserved at least one element (up to #=requestedChunkSize) 
768                 /// false if all elements in the source collection have been reserved.
769                 /// </returns>
770                 internal bool GrabChunk_Buffered(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
771                 {
772                     Contract.Assert(requestedChunkSize > 0);
773                     Contract.Assert(!m_useSingleChunking, "Did not expect to be in single-chunking mode");
774
775                     TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
776                     
777                     if (actualNumElementsGrabbed == requestedChunkSize)
778                     {
779                         // that was easy.
780                         return true;
781                     }
782                     else if (m_sourceDepleted.Value)
783                     {
784                         // looks like we both reached the end of the fill buffer, and the source was depleted previously
785                         // this means no more work to do for any other worker
786                         m_hasNoElementsLeft.Value = true;
787                         m_FillBuffer = null;
788                         return (actualNumElementsGrabbed > 0);
789                     }
790
791
792                     //
793                     //  now's the time to take the shared lock and enumerate
794                     //
795                     lock (m_sharedLock)
796                     {
797                         if (m_sourceDepleted.Value)
798                         {
799                             return (actualNumElementsGrabbed > 0);
800                         }
801                         
802                         try
803                         {
804                             // we need to make sure all array copiers are finished
805                             if (m_activeCopiers > 0)
806                             {                                    
807                                 SpinWait sw = new SpinWait();
808                                 while( m_activeCopiers > 0) sw.SpinOnce();
809                             }
810
811                             Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk
812
813                             // Now's the time to actually enumerate the source
814
815                             // We first fill up the requested # of elements in the caller's array
816                             // continue from the where TryCopyFromFillBuffer() left off
817                             for (; actualNumElementsGrabbed < requestedChunkSize; actualNumElementsGrabbed++)
818                             {
819                                 if (m_sharedReader.MoveNext())
820                                 {
821                                     m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
822                                     destArray[actualNumElementsGrabbed]
823                                         = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
824                                                                           m_sharedReader.Current);
825                                 }
826                                 else
827                                 {
828                                     //if MoveNext() return false, we set the flag to inform other partitions
829                                     m_sourceDepleted.Value = true;
830                                     break;
831                                 }
832                             }
833
834                             // taking a local snapshot of m_FillBuffer in case some other thread decides to null out m_FillBuffer 
835                             // in the entry of this method after observing m_sourceCompleted = true
836                             var localFillBufferRef = m_FillBuffer;
837
838                             // If the big buffer seems to be depleted, we will also fill that up while we are under the lock
839                             // Note that this is the only place that m_FillBufferCurrentPosition can be reset
840                             if (m_sourceDepleted.Value == false && localFillBufferRef != null && 
841                                 m_FillBufferCurrentPosition >= localFillBufferRef.Length)
842                             {
843                                 for (int i = 0; i < localFillBufferRef.Length; i++)
844                                 {
845                                     if( m_sharedReader.MoveNext())
846                                     {
847                                         m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
848                                         localFillBufferRef[i]
849                                             = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
850                                                                               m_sharedReader.Current);
851                                     }
852                                     else
853                                     {
854                                         // No more elements left in the enumerator.
855                                         // Record this, so that the next request can skip the lock
856                                         m_sourceDepleted.Value = true;
857
858                                         // also record the current count in m_FillBufferSize
859                                         m_FillBufferSize = i;
860
861                                         // and exit the for loop so that we don't keep incrementing m_FillBufferSize
862                                         break;
863                                     }
864
865                                 }
866
867                                 m_FillBufferCurrentPosition = 0;
868                             }
869
870
871                         }
872                         catch
873                         {
874                             // If an exception occurs, don't let the other enumerators try to enumerate.
875                             // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected 
876                             // and not helpful to the end user.  We know the root cause is being communicated already.)
877                             m_sourceDepleted.Value = true;
878                             m_hasNoElementsLeft.Value = true;
879                             throw;
880                         }
881                     }
882
883                     return (actualNumElementsGrabbed > 0);
884                 }
885
886                 public void Dispose()
887                 {
888                     if (!m_disposed)
889                     {
890                         m_disposed = true;
891                         m_sharedReader.Dispose();
892                     }
893                 }
894             }
895
896             /// <summary>
897             /// Inherits from DynamicPartitionEnumerator_Abstract directly
898             /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose
899             /// </summary>
900             private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract<TSource, IEnumerator<TSource>>
901             {
902                 //---- fields ----
903                 //cached local copy of the current chunk
904                 private KeyValuePair<long, TSource>[] m_localList; //defer allocating to avoid false sharing
905
906                 // the values of the following two fields are passed in from
907                 // outside(already initialized) by the constructor, 
908                 private readonly SharedBool m_hasNoElementsLeft;
909 #pragma warning disable 414
910                 private readonly object m_sharedLock;
911 #pragma warning restore 414
912                 private readonly SharedInt m_activePartitionCount;
913                 private InternalPartitionEnumerable m_enumerable;
914
915                 //constructor
916                 internal InternalPartitionEnumerator(
917                     IEnumerator<TSource> sharedReader,
918                     SharedLong sharedIndex,
919                     SharedBool hasNoElementsLeft,
920                     object sharedLock,
921                     SharedInt activePartitionCount,
922                     InternalPartitionEnumerable enumerable,
923                     bool useSingleChunking)
924                     : base(sharedReader, sharedIndex, useSingleChunking)
925                 {
926                     m_hasNoElementsLeft = hasNoElementsLeft;
927                     m_sharedLock = sharedLock;
928                     m_enumerable = enumerable;
929                     m_activePartitionCount = activePartitionCount;
930
931                     if (m_activePartitionCount != null)
932                     {
933                         // If static partitioning, we need to increase the active partition count.
934                         Interlocked.Increment(ref m_activePartitionCount.Value);
935                     }
936                 }
937
938                 //overriding methods
939
940                 /// <summary>
941                 /// Reserves a contiguous range of elements from source data
942                 /// </summary>
943                 /// <param name="requestedChunkSize">specified number of elements requested</param>
944                 /// <returns>
945                 /// true if we successfully reserved at least one element (up to #=requestedChunkSize) 
946                 /// false if all elements in the source collection have been reserved.
947                 /// </returns>
948                 override protected bool GrabNextChunk(int requestedChunkSize)
949                 {
950                     Contract.Assert(requestedChunkSize > 0);
951
952                     if (HasNoElementsLeft)
953                     {
954                         return false;
955                     }
956
957                     // defer allocation to avoid false sharing
958                     if (m_localList == null)
959                     {
960                         m_localList = new KeyValuePair<long, TSource>[m_maxChunkSize];
961                     }
962
963                     // make the actual call to the enumerable that grabs a chunk
964                     return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value);
965                 }
966
967                 /// <summary>
968                 /// Returns whether or not the shared reader has already read the last 
969                 /// element of the source data 
970                 /// </summary>
971                 /// <remarks>
972                 /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
973                 /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared 
974                 /// boolean value m_hasNoElementsLeft across all partitions
975                 /// </remarks>
976                 override protected bool HasNoElementsLeft
977                 {
978                     get { return m_hasNoElementsLeft.Value; }
979                     set
980                     {
981                         //we only set it from false to true once
982                         //we should never set it back in any circumstances
983                         Contract.Assert(value);
984                         Contract.Assert(!m_hasNoElementsLeft.Value);
985                         m_hasNoElementsLeft.Value = true;
986                     }
987                 }
988
989                 override public KeyValuePair<long, TSource> Current
990                 {
991                     get
992                     {
993                         //verify that MoveNext is at least called once before Current is called 
994                         if (m_currentChunkSize == null)
995                         {
996                             throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
997                         }
998                         Contract.Assert(m_localList != null);
999                         Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1000                         return (m_localList[m_localOffset.Value]);
1001                     }
1002                 }
1003
1004                 override public void Dispose()
1005                 {
1006                     // If this is static partitioning, ie. m_activePartitionCount != null, since the current partition 
1007                     // is disposed, we decrement the number of active partitions for the shared reader. 
1008                     if (m_activePartitionCount != null && Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
1009                     {
1010                         // If the number of active partitions becomes 0, we need to dispose the shared 
1011                         // reader we created in the m_enumerable object.
1012                         m_enumerable.Dispose();
1013                     }
1014                     // If this is dynamic partitioning, ie. m_activePartitionCount != null, then m_enumerable needs to
1015                     // be disposed explicitly by the user, and we do not need to anything here
1016                 }
1017             }
1018             #endregion
1019
1020         }
1021         #endregion
1022
1023         #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>)
1024         /// <summary>
1025         /// Dynamic load-balance partitioner. This class is abstract and to be derived from by 
1026         /// the customized partitioner classes for IList, Array, and IEnumerable
1027         /// </summary>
1028         /// <typeparam name="TSource">Type of the elements in the source data</typeparam>
1029         /// <typeparam name="TCollection"> Type of the source data collection</typeparam>
1030         private abstract class DynamicPartitionerForIndexRange_Abstract<TSource, TCollection> : OrderablePartitioner<TSource>
1031         {
1032             // TCollection can be: IList<TSource>, TSource[] and IEnumerable<TSource>
1033             // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
1034             TCollection m_data;
1035
1036             /// <summary>
1037             /// Constructs a new orderable partitioner 
1038             /// </summary>
1039             /// <param name="data">source data collection</param>
1040             protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
1041                 : base(true, false, true)
1042             {
1043                 m_data = data;
1044             }
1045
1046             /// <summary>
1047             /// Partition the source data and create an enumerable over the resulting partitions. 
1048             /// </summary>
1049             /// <param name="data">the source data collection</param>
1050             /// <returns>an enumerable of partitions of </returns>
1051             protected abstract IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TCollection data);
1052
1053             /// <summary>
1054             /// Overrides OrderablePartitioner.GetOrderablePartitions.
1055             /// Partitions the underlying collection into the given number of orderable partitions.
1056             /// </summary>
1057             /// <param name="partitionCount">number of partitions requested</param>
1058             /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
1059             override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
1060             {
1061                 if (partitionCount <= 0)
1062                 {
1063                     throw new ArgumentOutOfRangeException("partitionCount");
1064                 }
1065                 IEnumerator<KeyValuePair<long, TSource>>[] partitions
1066                     = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
1067                 IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data);
1068                 for (int i = 0; i < partitionCount; i++)
1069                 {
1070                     partitions[i] = partitionEnumerable.GetEnumerator();
1071                 }
1072                 return partitions;
1073             }
1074
1075             /// <summary>
1076             /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
1077             /// </summary>
1078             /// <returns>a enumerable collection of orderable partitions</returns>
1079             override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
1080             {
1081                 return GetOrderableDynamicPartitions_Factory(m_data);
1082             }
1083
1084             /// <summary>
1085             /// Whether additional partitions can be created dynamically.
1086             /// </summary>
1087             override public bool SupportsDynamicPartitions
1088             {
1089                 get { return true; }
1090             }
1091
1092         }
1093
1094         /// <summary>
1095         /// Defines dynamic partition for source data of IList and Array. 
1096         /// This class inherits DynamicPartitionEnumerator_Abstract
1097         ///   - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array
1098         ///   - Current property still remains abstract, implementation is different for IList and Array
1099         ///   - introduces another abstract method SourceCount, which returns the number of elements in
1100         ///     the source data. Implementation differs for IList and Array
1101         /// </summary>
1102         /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
1103         /// <typeparam name="TSourceReader">Type of the reader on the source data</typeparam>
1104         private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSourceReader> : DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>
1105         {
1106             //fields
1107             protected int m_startIndex; //initially zero
1108
1109             //constructor
1110             protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
1111                 : base(sharedReader, sharedIndex)
1112             {
1113             }
1114
1115             //abstract methods
1116             //the Current property is still abstract, and will be implemented by derived classes
1117             //we add another abstract method SourceCount to get the number of elements from the source reader
1118
1119             /// <summary>
1120             /// Get the number of elements from the source reader.
1121             /// It calls IList.Count or Array.Length
1122             /// </summary>
1123             protected abstract int SourceCount { get; }
1124
1125             //overriding methods
1126
1127             /// <summary>
1128             /// Reserves a contiguous range of elements from source data
1129             /// </summary>
1130             /// <param name="requestedChunkSize">specified number of elements requested</param>
1131             /// <returns>
1132             /// true if we successfully reserved at least one element (up to #=requestedChunkSize) 
1133             /// false if all elements in the source collection have been reserved.
1134             /// </returns>
1135             override protected bool GrabNextChunk(int requestedChunkSize)
1136             {
1137                 Contract.Assert(requestedChunkSize > 0);
1138
1139                 while (!HasNoElementsLeft)
1140                 {
1141                     Contract.Assert(m_sharedIndex != null);
1142                     // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
1143                     long oldSharedIndex = Volatile.Read(ref m_sharedIndex.Value);
1144
1145                     if (HasNoElementsLeft)
1146                     {
1147                         //HasNoElementsLeft situation changed from false to true immediately
1148                         //and oldSharedIndex becomes stale
1149                         return false;
1150                     }
1151
1152                     //there won't be overflow, because the index of IList/array is int, and we 
1153                     //have casted it to long. 
1154                     long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);
1155
1156
1157                     //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex] 
1158                     //inclusive in the source collection
1159                     if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex)
1160                         == oldSharedIndex)
1161                     {
1162                         //set up local indexes.
1163                         //m_currentChunkSize is always set to requestedChunkSize when source data had 
1164                         //enough elements of what we requested
1165                         m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex);
1166                         m_localOffset.Value = -1;
1167                         m_startIndex = (int)(oldSharedIndex + 1);
1168                         return true;
1169                     }
1170                 }
1171                 //didn't get any element, return false;
1172                 return false;
1173             }
1174
1175             /// <summary>
1176             /// Returns whether or not the shared reader has already read the last 
1177             /// element of the source data 
1178             /// </summary>
1179             override protected bool HasNoElementsLeft
1180             {
1181                 get
1182                 {
1183                     Contract.Assert(m_sharedIndex != null);
1184                     // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
1185                     return Volatile.Read(ref m_sharedIndex.Value) >= SourceCount - 1;
1186                 }
1187                 set
1188                 {
1189                     Contract.Assert(false);
1190                 }
1191             }
1192
1193             /// <summary>
1194             /// For source data type IList and Array, the type of the shared reader is just the data itself.
1195             /// We don't do anything in Dispose method for IList and Array. 
1196             /// </summary>
1197             override public void Dispose()
1198             { }
1199         }
1200
1201
1202         /// <summary>
1203         /// Inherits from DynamicPartitioners
1204         /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
1205         /// of EnumerableOfPartitionsForIList defined internally
1206         /// </summary>
1207         /// <typeparam name="TSource">Type of elements in the source data</typeparam>
1208         private class DynamicPartitionerForIList<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, IList<TSource>>
1209         {
1210             //constructor
1211             internal DynamicPartitionerForIList(IList<TSource> source)
1212                 : base(source)
1213             { }
1214
1215             //override methods
1216             override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(IList<TSource> m_data)
1217             {
1218                 //m_data itself serves as shared reader
1219                 return new InternalPartitionEnumerable(m_data);
1220             }
1221
1222             /// <summary>
1223             /// Inherits from PartitionList_Abstract 
1224             /// Provides customized implementation for source data of IList
1225             /// </summary>
1226             private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
1227             {
1228                 //reader through which we access the source data
1229                 private readonly IList<TSource> m_sharedReader;
1230                 private SharedLong m_sharedIndex;
1231
1232                 internal InternalPartitionEnumerable(IList<TSource> sharedReader)
1233                 {
1234                     m_sharedReader = sharedReader;
1235                     m_sharedIndex = new SharedLong(-1);
1236                 }
1237
1238                 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
1239                 {
1240                     return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
1241                 }
1242
1243                 IEnumerator IEnumerable.GetEnumerator()
1244                 {
1245                     return ((InternalPartitionEnumerable)this).GetEnumerator();
1246                 }
1247             }
1248
1249             /// <summary>
1250             /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
1251             /// Provides customized implementation of SourceCount property and Current property for IList
1252             /// </summary>
1253             private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, IList<TSource>>
1254             {
1255                 //constructor
1256                 internal InternalPartitionEnumerator(IList<TSource> sharedReader, SharedLong sharedIndex)
1257                     : base(sharedReader, sharedIndex)
1258                 { }
1259
1260                 //overriding methods
1261                 override protected int SourceCount
1262                 {
1263                     get { return m_sharedReader.Count; }
1264                 }
1265                 /// <summary>
1266                 /// return a KeyValuePair of the current element and its key 
1267                 /// </summary>
1268                 override public KeyValuePair<long, TSource> Current
1269                 {
1270                     get
1271                     {
1272                         //verify that MoveNext is at least called once before Current is called 
1273                         if (m_currentChunkSize == null)
1274                         {
1275                             throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1276                         }
1277
1278                         Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1279                         return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
1280                             m_sharedReader[m_startIndex + m_localOffset.Value]);
1281                     }
1282                 }
1283             }
1284         }
1285
1286
1287
1288         /// <summary>
1289         /// Inherits from DynamicPartitioners
1290         /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
1291         /// of EnumerableOfPartitionsForArray defined internally
1292         /// </summary>
1293         /// <typeparam name="TSource">Type of elements in the source data</typeparam>
1294         private class DynamicPartitionerForArray<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, TSource[]>
1295         {
1296             //constructor
1297             internal DynamicPartitionerForArray(TSource[] source)
1298                 : base(source)
1299             { }
1300
1301             //override methods
1302             override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
1303             {
1304                 return new InternalPartitionEnumerable(m_data);
1305             }
1306
1307             /// <summary>
1308             /// Inherits from PartitionList_Abstract 
1309             /// Provides customized implementation for source data of Array
1310             /// </summary>
1311             private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
1312             {
1313                 //reader through which we access the source data
1314                 private readonly TSource[] m_sharedReader;
1315                 private SharedLong m_sharedIndex;
1316
1317                 internal InternalPartitionEnumerable(TSource[] sharedReader)
1318                 {
1319                     m_sharedReader = sharedReader;
1320                     m_sharedIndex = new SharedLong(-1);
1321                 }
1322
1323                 IEnumerator IEnumerable.GetEnumerator()
1324                 {
1325                     return ((InternalPartitionEnumerable)this).GetEnumerator();
1326                 }
1327
1328
1329                 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
1330                 {
1331                     return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
1332                 }
1333             }
1334
1335             /// <summary>
1336             /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
1337             /// Provides customized implementation of SourceCount property and Current property for Array
1338             /// </summary>
1339             private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSource[]>
1340             {
1341                 //constructor
1342                 internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex)
1343                     : base(sharedReader, sharedIndex)
1344                 { }
1345
1346                 //overriding methods
1347                 override protected int SourceCount
1348                 {
1349                     get { return m_sharedReader.Length; }
1350                 }
1351
1352                 override public KeyValuePair<long, TSource> Current
1353                 {
1354                     get
1355                     {
1356                         //verify that MoveNext is at least called once before Current is called 
1357                         if (m_currentChunkSize == null)
1358                         {
1359                             throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1360                         }
1361
1362                         Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1363                         return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
1364                             m_sharedReader[m_startIndex + m_localOffset.Value]);
1365                     }
1366                 }
1367             }
1368         }
1369         #endregion
1370
1371
1372         #region Static partitioning for IList and Array, abstract classes
1373         /// <summary>
1374         /// Static partitioning over IList. 
1375         /// - dynamic and load-balance
1376         /// - Keys are ordered within each partition
1377         /// - Keys are ordered across partitions
1378         /// - Keys are normalized
1379         /// - Number of partitions is fixed once specified, and the elements of the source data are 
1380         /// distributed to each partition as evenly as possible. 
1381         /// </summary>
1382         /// <typeparam name="TSource">type of the elements</typeparam>        
1383         /// <typeparam name="TCollection">Type of the source data collection</typeparam>
1384         private abstract class StaticIndexRangePartitioner<TSource, TCollection> : OrderablePartitioner<TSource>
1385         {
1386             protected StaticIndexRangePartitioner()
1387                 : base(true, true, true)
1388             { }
1389
1390             /// <summary>
1391             /// Abstract method to return the number of elements in the source data
1392             /// </summary>
1393             protected abstract int SourceCount { get; }
1394
1395             /// <summary>
1396             /// Abstract method to create a partition that covers a range over source data, 
1397             /// starting from "startIndex", ending at "endIndex"
1398             /// </summary>
1399             /// <param name="startIndex">start index of the current partition on the source data</param>
1400             /// <param name="endIndex">end index of the current partition on the source data</param>
1401             /// <returns>a partition enumerator over the specified range</returns>
1402             // The partitioning algorithm is implemented in GetOrderablePartitions method
1403             // This method delegates according to source data type IList/Array
1404             protected abstract IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex);
1405
1406             /// <summary>
1407             /// Overrides OrderablePartitioner.GetOrderablePartitions
1408             /// Return a list of partitions, each of which enumerate a fixed part of the source data
1409             /// The elements of the source data are distributed to each partition as evenly as possible. 
1410             /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b, 
1411             /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements,
1412             /// and the last x-b partitions each has a elements.
1413             /// For example, if N=10, x =3, then 
1414             ///    partition 0 ranges [0,3],
1415             ///    partition 1 ranges [4,6],
1416             ///    partition 2 ranges [7,9].
1417             /// This also takes care of the situation of (x&gt;N), the last x-N partitions are empty enumerators. 
1418             /// An empty enumerator is indicated by 
1419             ///      (m_startIndex == list.Count &amp;&amp; m_endIndex == list.Count -1)
1420             /// </summary>
1421             /// <param name="partitionCount">specified number of partitions</param>
1422             /// <returns>a list of partitions</returns>
1423             override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
1424             {
1425                 if (partitionCount <= 0)
1426                 {
1427                     throw new ArgumentOutOfRangeException("partitionCount");
1428                 }
1429
1430                 int quotient, remainder;
1431                 quotient = Math.DivRem(SourceCount, partitionCount, out remainder);
1432
1433                 IEnumerator<KeyValuePair<long, TSource>>[] partitions = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
1434                 int lastEndIndex = -1;
1435                 for (int i = 0; i < partitionCount; i++)
1436                 {
1437                     int startIndex = lastEndIndex + 1;
1438
1439                     if (i < remainder)
1440                     {
1441                         lastEndIndex = startIndex + quotient;
1442                     }
1443                     else
1444                     {
1445                         lastEndIndex = startIndex + quotient - 1;
1446                     }
1447                     partitions[i] = CreatePartition(startIndex, lastEndIndex);
1448                 }
1449                 return partitions;
1450             }
1451         }
1452
1453         /// <summary>
1454         /// Static Partition for IList/Array.
1455         /// This class implements all methods required by IEnumerator interface, except for the Current property.
1456         /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element 
1457         /// retrieval.
1458         /// </summary>
1459         //We assume the source collection is not being updated concurrently. Otherwise it will break the  
1460         //static partitioning, since each partition operates on the source collection directly, it does 
1461         //not have a local cache of the elements assigned to them.  
1462         private abstract class StaticIndexRangePartition<TSource> : IEnumerator<KeyValuePair<long, TSource>>
1463         {
1464             //the start and end position in the source collection for the current partition
1465             //the partition is empty if and only if 
1466             // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1)
1467             protected readonly int m_startIndex;
1468             protected readonly int m_endIndex;
1469
1470             //the current index of the current partition while enumerating on the source collection
1471             protected volatile int m_offset;
1472
1473             /// <summary>
1474             /// Constructs an instance of StaticIndexRangePartition
1475             /// </summary>
1476             /// <param name="startIndex">the start index in the source collection for the current partition </param>
1477             /// <param name="endIndex">the end index in the source collection for the current partition</param>
1478             protected StaticIndexRangePartition(int startIndex, int endIndex)
1479             {
1480                 m_startIndex = startIndex;
1481                 m_endIndex = endIndex;
1482                 m_offset = startIndex - 1;
1483             }
1484
1485             /// <summary>
1486             /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster 
1487             /// element retrieval.
1488             /// </summary>
1489             public abstract KeyValuePair<long, TSource> Current { get; }
1490
1491             /// <summary>
1492             /// We don't dispose the source for IList and array
1493             /// </summary>
1494             public void Dispose()
1495             { }
1496
1497             public void Reset()
1498             {
1499                 throw new NotSupportedException();
1500             }
1501
1502             /// <summary>
1503             /// Moves to the next item
1504             /// Before the first MoveNext is called: m_offset == m_startIndex-1;
1505             /// </summary>
1506             /// <returns>true if successful, false if there is no item left</returns>
1507             public bool MoveNext()
1508             {
1509                 if (m_offset < m_endIndex)
1510                 {
1511                     m_offset++;
1512                     return true;
1513                 }
1514                 else
1515                 {
1516                     //After we have enumerated over all elements, we set m_offset to m_endIndex +1.
1517                     //The reason we do this is, for an empty enumerator, we need to tell the Current 
1518                     //property whether MoveNext has been called or not. 
1519                     //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex), 
1520                     //and we don't set a new value to m_offset, then the above condition will always be 
1521                     //true, and the Current property will mistakenly assume MoveNext is never called.
1522                     m_offset = m_endIndex + 1;
1523                     return false;
1524                 }
1525             }
1526
1527             Object IEnumerator.Current
1528             {
1529                 get
1530                 {
1531                     return ((StaticIndexRangePartition<TSource>)this).Current;
1532                 }
1533             }
1534         }
1535         #endregion
1536
1537         #region Static partitioning for IList
1538         /// <summary>
1539         /// Inherits from StaticIndexRangePartitioner
1540         /// Provides customized implementation of SourceCount and CreatePartition
1541         /// </summary>
1542         /// <typeparam name="TSource"></typeparam>
1543         private class StaticIndexRangePartitionerForIList<TSource> : StaticIndexRangePartitioner<TSource, IList<TSource>>
1544         {
1545             IList<TSource> m_list;
1546             internal StaticIndexRangePartitionerForIList(IList<TSource> list)
1547                 : base()
1548             {
1549                 Contract.Assert(list != null);
1550                 m_list = list;
1551             }
1552             override protected int SourceCount
1553             {
1554                 get { return m_list.Count; }
1555             }
1556             override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
1557             {
1558                 return new StaticIndexRangePartitionForIList<TSource>(m_list, startIndex, endIndex);
1559             }
1560         }
1561
1562         /// <summary>
1563         /// Inherits from StaticIndexRangePartition
1564         /// Provides customized implementation of Current property
1565         /// </summary>
1566         /// <typeparam name="TSource"></typeparam>
1567         private class StaticIndexRangePartitionForIList<TSource> : StaticIndexRangePartition<TSource>
1568         {
1569             //the source collection shared by all partitions
1570             private volatile IList<TSource> m_list;
1571
1572             internal StaticIndexRangePartitionForIList(IList<TSource> list, int startIndex, int endIndex)
1573                 : base(startIndex, endIndex)
1574             {
1575                 Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1);
1576                 m_list = list;
1577             }
1578
1579             override public KeyValuePair<long, TSource> Current
1580             {
1581                 get
1582                 {
1583                     //verify that MoveNext is at least called once before Current is called 
1584                     if (m_offset < m_startIndex)
1585                     {
1586                         throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1587                     }
1588
1589                     Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
1590                     return (new KeyValuePair<long, TSource>(m_offset, m_list[m_offset]));
1591                 }
1592             }
1593         }
1594         #endregion
1595
1596         #region static partitioning for Arrays
1597         /// <summary>
1598         /// Inherits from StaticIndexRangePartitioner
1599         /// Provides customized implementation of SourceCount and CreatePartition for Array
1600         /// </summary>
1601         private class StaticIndexRangePartitionerForArray<TSource> : StaticIndexRangePartitioner<TSource, TSource[]>
1602         {
1603             TSource[] m_array;
1604             internal StaticIndexRangePartitionerForArray(TSource[] array)
1605                 : base()
1606             {
1607                 Contract.Assert(array != null);
1608                 m_array = array;
1609             }
1610             override protected int SourceCount
1611             {
1612                 get { return m_array.Length; }
1613             }
1614             override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
1615             {
1616                 return new StaticIndexRangePartitionForArray<TSource>(m_array, startIndex, endIndex);
1617             }
1618         }
1619
1620         /// <summary>
1621         /// Inherits from StaticIndexRangePartitioner
1622         /// Provides customized implementation of SourceCount and CreatePartition
1623         /// </summary>
1624         private class StaticIndexRangePartitionForArray<TSource> : StaticIndexRangePartition<TSource>
1625         {
1626             //the source collection shared by all partitions
1627             private volatile TSource[] m_array;
1628
1629             internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex)
1630                 : base(startIndex, endIndex)
1631             {
1632                 Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1);
1633                 m_array = array;
1634             }
1635
1636             override public KeyValuePair<long, TSource> Current
1637             {
1638                 get
1639                 {
1640                     //verify that MoveNext is at least called once before Current is called 
1641                     if (m_offset < m_startIndex)
1642                     {
1643                         throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1644                     }
1645
1646                     Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
1647                     return (new KeyValuePair<long, TSource>(m_offset, m_array[m_offset]));
1648                 }
1649             }
1650         }
1651         #endregion
1652
1653
1654         #region Utility functions
1655         /// <summary>
1656         /// A very simple primitive that allows us to share a value across multiple threads.
1657         /// </summary>
1658         /// <typeparam name="TSource"></typeparam>
1659         private class SharedInt
1660         {
1661             internal volatile int Value;
1662
1663             internal SharedInt(int value)
1664             {
1665                 this.Value = value;
1666             }
1667
1668         }
1669
1670         /// <summary>
1671         /// A very simple primitive that allows us to share a value across multiple threads.
1672         /// </summary>
1673         private class SharedBool
1674         {
1675             internal volatile bool Value;
1676
1677             internal SharedBool(bool value)
1678             {
1679                 this.Value = value;
1680             }
1681
1682         }
1683
1684         /// <summary>
1685         /// A very simple primitive that allows us to share a value across multiple threads.
1686         /// </summary>
1687         private class SharedLong
1688         {
1689             internal long Value;
1690             internal SharedLong(long value)
1691             {
1692                 this.Value = value;
1693             }
1694
1695         }
1696
1697         //--------------------
1698         // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling,
1699         // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling
1700         //--------------------
1701
1702         // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache
1703         // lines worth, assuming 128b cache line.  Most (popular) architectures use 64b cache lines,
1704         // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient
1705         // for 128b cache systems.  So 128b it is.
1706         private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4;
1707         
1708         private static int GetDefaultChunkSize<TSource>()
1709         {
1710             int chunkSize;
1711
1712             if (typeof(TSource).IsValueType)
1713             {
1714 #if !FEATURE_CORECLR // Marshal.SizeOf is not supported in CoreCLR
1715                 // @
1716
1717                 if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit)
1718                 {
1719                     chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource)));
1720                 }
1721                 else
1722                 {
1723                     // We choose '128' because this ensures, no matter the actual size of the value type,
1724                     // the total bytes used will be a multiple of 128. This ensures it's cache aligned.
1725                     chunkSize = 128;
1726                 }
1727 #else
1728                 chunkSize = 128;
1729 #endif
1730             }
1731             else
1732             {
1733                 Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size");
1734                 chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size);
1735             }
1736             return chunkSize;
1737         }
1738         #endregion
1739
1740     }
1741 }