1 #pragma warning disable 0420
4 // Copyright (c) Microsoft Corporation. All rights reserved.
7 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // PartitionerStatic.cs
11 // <OWNER>Microsoft</OWNER>
13 // A class of default partitioners for Partitioner<TSource>
15 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
17 using System.Collections.Generic;
18 using System.Security.Permissions;
19 using System.Threading;
20 using System.Diagnostics.Contracts;
21 using System.Runtime.InteropServices;
23 namespace System.Collections.Concurrent
26 /// Out-of-the-box partitioners are created with a set of default behaviors.
27 /// For example, by default, some form of buffering and chunking will be employed to achieve
28 /// optimal performance in the common scenario where an IEnumerable<> implementation is fast and
29 /// non-blocking. These behaviors can be overridden via this enumeration.
35 public enum EnumerablePartitionerOptions
38 /// Use the default behavior (i.e., use buffering to achieve optimal performance)
43 /// Creates a partitioner that will take items from the source enumerable one at a time
44 /// and will not use intermediate storage that can be accessed more efficiently by multiple threads.
45 /// This option provides support for low latency (items will be processed as soon as they are available from
46 /// the source) and partial support for dependencies between items (a thread cannot deadlock waiting for an item
47 /// that it, itself, is responsible for processing).
52 // The static class Partitioners implements 3 default partitioning strategies:
53 // 1. dynamic load balance partitioning for indexable data source (IList and arrays)
54 // 2. static partitioning for indexable data source (IList and arrays)
55 // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
56 // of elements, but enuemrators are not indexable
57 // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
58 // We assume that the source data of IList/Array is not changing concurrently.
59 // - data source of type IEnumerable can only be partitioned dynamically (load-balance)
60 // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the
61 // implementation is different for data source of IList/Array vs. IEnumerable:
62 // * When the source collection is IList/Arrays, we use Interlocked on the shared index;
63 // * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source
67 /// Provides common partitioning strategies for arrays, lists, and enumerables.
71 /// The static methods on <see cref="Partitioner"/> are all thread-safe and may be used concurrently
72 /// from multiple threads. However, while a created partitioner is in use, the underlying data source
73 /// should not be modified, whether from the same thread that's using a partitioner or from a separate
77 [HostProtection(Synchronization = true, ExternalThreading = true)]
78 public static class Partitioner
81 /// Creates an orderable partitioner from an <see cref="System.Collections.Generic.IList{T}"/>
84 /// <typeparam name="TSource">Type of the elements in source list.</typeparam>
85 /// <param name="list">The list to be partitioned.</param>
86 /// <param name="loadBalance">
87 /// A Boolean value that indicates whether the created partitioner should dynamically
88 /// load balance between partitions rather than statically partition.
91 /// An orderable partitioner based on the input list.
93 public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance)
97 throw new ArgumentNullException("list");
101 return (new DynamicPartitionerForIList<TSource>(list));
105 return (new StaticIndexRangePartitionerForIList<TSource>(list));
110 /// Creates an orderable partitioner from a <see cref="System.Array"/> instance.
112 /// <typeparam name="TSource">Type of the elements in source array.</typeparam>
113 /// <param name="array">The array to be partitioned.</param>
114 /// <param name="loadBalance">
115 /// A Boolean value that indicates whether the created partitioner should dynamically load balance
116 /// between partitions rather than statically partition.
119 /// An orderable partitioner based on the input array.
121 public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance)
123 // This implementation uses 'ldelem' instructions for element retrieval, rather than using a
128 throw new ArgumentNullException("array");
132 return (new DynamicPartitionerForArray<TSource>(array));
136 return (new StaticIndexRangePartitionerForArray<TSource>(array));
141 /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
143 /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
144 /// <param name="source">The enumerable to be partitioned.</param>
146 /// An orderable partitioner based on the input array.
149 /// The ordering used in the created partitioner is determined by the natural order of the elements
150 /// as retrieved from the source enumerable.
152 public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source)
154 return Create<TSource>(source, EnumerablePartitionerOptions.None);
158 /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
160 /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
161 /// <param name="source">The enumerable to be partitioned.</param>
162 /// <param name="partitionerOptions">Options to control the buffering behavior of the partitioner.</param>
163 /// <exception cref="T:System.ArgumentOutOfRangeException">
164 /// The <paramref name="partitionerOptions"/> argument specifies an invalid value for <see
165 /// cref="T:System.Collections.Concurrent.EnumerablePartitionerOptions"/>.
168 /// An orderable partitioner based on the input array.
171 /// The ordering used in the created partitioner is determined by the natural order of the elements
172 /// as retrieved from the source enumerable.
174 public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
178 throw new ArgumentNullException("source");
181 if ((partitionerOptions & (~EnumerablePartitionerOptions.NoBuffering)) != 0)
182 throw new ArgumentOutOfRangeException("partitionerOptions");
184 return (new DynamicPartitionerForIEnumerable<TSource>(source, partitionerOptions));
188 /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
189 /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
190 /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
191 /// <returns>A partitioner.</returns>
192 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
193 /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
194 public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive)
196 // How many chunks do we want to divide the range into? If this is 1, then the
197 // answer is "one chunk per core". Generally, though, you'll achieve better
198 // load balancing on a busy system if you make it higher than 1.
199 int coreOversubscriptionRate = 3;
201 if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
202 long rangeSize = (toExclusive - fromInclusive) /
203 (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
204 if (rangeSize == 0) rangeSize = 1;
205 return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
208 /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
209 /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
210 /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
211 /// <param name="rangeSize">The size of each subrange.</param>
212 /// <returns>A partitioner.</returns>
213 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
214 /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
215 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is
216 /// less than or equal to 0.</exception>
217 public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize)
219 if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
220 if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
221 return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
224 // Private method to parcel out range tuples.
225 private static IEnumerable<Tuple<long, long>> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
227 // Enumerate all of the ranges
229 bool shouldQuit = false;
231 for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
234 try { checked { to = i + rangeSize; } }
235 catch (OverflowException)
240 if (to > toExclusive) to = toExclusive;
241 yield return new Tuple<long, long>(from, to);
245 /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
246 /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
247 /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
248 /// <returns>A partitioner.</returns>
249 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
250 /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
251 public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
253 // How many chunks do we want to divide the range into? If this is 1, then the
254 // answer is "one chunk per core". Generally, though, you'll achieve better
255 // load balancing on a busy system if you make it higher than 1.
256 int coreOversubscriptionRate = 3;
258 if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
259 int rangeSize = (toExclusive - fromInclusive) /
260 (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
261 if (rangeSize == 0) rangeSize = 1;
262 return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
265 /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
266 /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
267 /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
268 /// <param name="rangeSize">The size of each subrange.</param>
269 /// <returns>A partitioner.</returns>
270 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
271 /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
272 /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is
273 /// less than or equal to 0.</exception>
274 public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize)
276 if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
277 if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
278 return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
281 // Private method to parcel out range tuples.
282 private static IEnumerable<Tuple<int, int>> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
284 // Enumerate all of the ranges
286 bool shouldQuit = false;
288 for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
291 try { checked { to = i + rangeSize; } }
292 catch (OverflowException)
297 if (to > toExclusive) to = toExclusive;
298 yield return new Tuple<int, int>(from, to);
303 #region DynamicPartitionEnumerator_Abstract class
305 /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance
306 /// partitioning algorithm.
307 /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source:
308 /// the key is the index in the source collection; the value is the item itself.
309 /// - a set of such partitions share a reader over data source. The type of the reader is specified by
311 /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk
312 /// size is initially 1, and doubles every time until it reaches the maximum chunk size.
313 /// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange
314 /// types (IList and the array), one for data source of IEnumerable.
315 /// - The method "Reset" is not supported for any partitioning algorithm.
316 /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it
317 /// in this abstract class.
319 /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
320 /// <typeparam name="TSourceReader">Type of the reader on the data source</typeparam>
322 // - IList<TSource>, when source data is IList<TSource>, the shared reader is source data itself
323 // - TSource[], when source data is TSource[], the shared reader is source data itself
324 // - IEnumerator<TSource>, when source data is IEnumerable<TSource>, and the shared reader is an
325 // enumerator of the source data
326 private abstract class DynamicPartitionEnumerator_Abstract<TSource, TSourceReader> : IEnumerator<KeyValuePair<long, TSource>>
328 //----------------- common fields and constructor for all dynamic partitioners -----------------
329 //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
330 protected readonly TSourceReader m_sharedReader;
332 protected static int s_defaultMaxChunkSize = GetDefaultChunkSize<TSource>();
334 //deferred allocating in MoveNext() with initial value 0, to avoid false sharing
335 //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator
336 protected SharedInt m_currentChunkSize;
338 //deferring allocation in MoveNext() with initial value -1, to avoid false sharing
339 protected SharedInt m_localOffset;
341 private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs
342 private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
343 protected readonly int m_maxChunkSize; // s_defaultMaxChunkSize unless single-chunking is requested by the caller
345 // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
346 // it serves as tracking of the natual order of elements in m_sharedReader
347 // the value of this field is passed in from outside (already initialized) by the constructor,
348 protected readonly SharedLong m_sharedIndex;
350 protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
351 : this(sharedReader, sharedIndex, false)
355 protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex, bool useSingleChunking)
357 m_sharedReader = sharedReader;
358 m_sharedIndex = sharedIndex;
359 m_maxChunkSize = useSingleChunking ? 1 : s_defaultMaxChunkSize;
362 // ---------------- abstract method declarations --------------
365 /// Abstract method to request a contiguous chunk of elements from the source collection
367 /// <param name="requestedChunkSize">specified number of elements requested</param>
369 /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
370 /// false if all elements in the source collection have been reserved.
372 //GrabNextChunk does the following:
373 // - grab # of requestedChunkSize elements from source data through shared reader,
374 // - at the time of function returns, m_currentChunkSize is updated with the number of
375 // elements actually got ----gined (<=requestedChunkSize).
376 // - GrabNextChunk returns true if at least one element is assigned to this partition;
377 // false if the shared reader already hits the last element of the source data before
378 // we call GrabNextChunk
379 protected abstract bool GrabNextChunk(int requestedChunkSize);
382 /// Abstract property, returns whether or not the shared reader has already read the last
383 /// element of the source data
385 protected abstract bool HasNoElementsLeft { get; set; }
388 /// Get the current element in the current partition. Property required by IEnumerator interface
389 /// This property is abstract because the implementation is different depending on the type
390 /// of the source data: IList, Array or IEnumerable
392 public abstract KeyValuePair<long, TSource> Current { get; }
395 /// Dispose is abstract, and depends on the type of the source data:
396 /// - For source data type IList and Array, the type of the shared reader is just the dataitself.
397 /// We don't do anything in Dispose method for IList and Array.
398 /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
399 /// Thus we need to dispose this shared reader enumerator, when there is no more active partitions
402 public abstract void Dispose();
405 /// Reset on partitions is not supported
409 throw new NotSupportedException();
414 /// Get the current element in the current partition. Property required by IEnumerator interface
416 Object IEnumerator.Current
420 return ((DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>)this).Current;
425 /// Moves to the next element if any.
426 /// Try current chunk first, if the current chunk do not have any elements left, then we
427 /// attempt to grab a chunk from the source collection.
430 /// true if successfully moving to the next position;
431 /// false otherwise, if and only if there is no more elements left in the current chunk
432 /// AND the source collection is exhausted.
434 public bool MoveNext()
436 //perform deferred allocating of the local variables.
437 if (m_localOffset == null)
439 Contract.Assert(m_currentChunkSize == null);
440 m_localOffset = new SharedInt(-1);
441 m_currentChunkSize = new SharedInt(0);
442 m_doublingCountdown = CHUNK_DOUBLING_RATE;
445 if (m_localOffset.Value < m_currentChunkSize.Value - 1)
446 //attempt to grab the next element from the local chunk
448 m_localOffset.Value++;
452 //otherwise it means we exhausted the local chunk
453 //grab a new chunk from the source enumerator
455 // The second part of the || condition is necessary to handle the case when MoveNext() is called
456 // after a previous MoveNext call returned false.
457 Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1 || m_currentChunkSize.Value == 0);
459 //set the requested chunk size to a proper value
460 int requestedChunkSize;
461 if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator
463 requestedChunkSize = 1;
465 else if (m_doublingCountdown > 0)
467 requestedChunkSize = m_currentChunkSize.Value;
471 requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
472 m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset
475 // Decrement your doubling countdown
476 m_doublingCountdown--;
478 Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize);
479 //GrabNextChunk will update the value of m_currentChunkSize
480 if (GrabNextChunk(requestedChunkSize))
482 Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
483 m_localOffset.Value = 0;
495 #region Dynamic Partitioner for source data of IEnuemrable<> type
497 /// Inherits from DynamicPartitioners
498 /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
499 /// of EnumerableOfPartitionsForIEnumerator defined internally
501 /// <typeparam name="TSource">Type of elements in the source data</typeparam>
502 private class DynamicPartitionerForIEnumerable<TSource> : OrderablePartitioner<TSource>
504 IEnumerable<TSource> m_source;
505 readonly bool m_useSingleChunking;
508 internal DynamicPartitionerForIEnumerable(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
509 : base(true, false, true)
512 m_useSingleChunking = ((partitionerOptions & EnumerablePartitionerOptions.NoBuffering) != 0);
516 /// Overrides OrderablePartitioner.GetOrderablePartitions.
517 /// Partitions the underlying collection into the given number of orderable partitions.
519 /// <param name="partitionCount">number of partitions requested</param>
520 /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
521 override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
523 if (partitionCount <= 0)
525 throw new ArgumentOutOfRangeException("partitionCount");
527 IEnumerator<KeyValuePair<long, TSource>>[] partitions
528 = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
530 IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, true);
531 for (int i = 0; i < partitionCount; i++)
533 partitions[i] = partitionEnumerable.GetEnumerator();
539 /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
541 /// <returns>a enumerable collection of orderable partitions</returns>
542 override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
544 return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, false);
548 /// Whether additional partitions can be created dynamically.
550 override public bool SupportsDynamicPartitions
555 #region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator
557 /// Provides customized implementation for source data of IEnumerable
558 /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
559 /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a
560 /// shared count "m_activePartitionCount" used to track active partitions when they were created statically
562 private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>, IDisposable
564 //reader through which we access the source data
565 private readonly IEnumerator<TSource> m_sharedReader;
566 private SharedLong m_sharedIndex;//initial value -1
568 private volatile KeyValuePair<long, TSource>[] m_FillBuffer; // intermediate buffer to reduce locking
569 private volatile int m_FillBufferSize; // actual number of elements in m_FillBuffer. Will start
570 // at m_FillBuffer.Length, and might be reduced during the last refill
571 private volatile int m_FillBufferCurrentPosition; //shared value to be accessed by Interlock.Increment only
572 private volatile int m_activeCopiers; //number of active copiers
574 //fields shared by all partitions that this Enumerable owns, their allocation is deferred
575 private SharedBool m_hasNoElementsLeft; // no elements left at all.
576 private SharedBool m_sourceDepleted; // no elements left in the enumerator, but there may be elements in the Fill Buffer
578 //shared synchronization lock, created by this Enumerable
579 private object m_sharedLock;//deferring allocation by enumerator
581 private bool m_disposed;
583 // If dynamic partitioning, then m_activePartitionCount == null
584 // If static partitioning, then it keeps track of active partition count
585 private SharedInt m_activePartitionCount;
587 // records whether or not the user has requested single-chunking behavior
588 private readonly bool m_useSingleChunking;
590 internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning)
592 m_sharedReader = sharedReader;
593 m_sharedIndex = new SharedLong(-1);
594 m_hasNoElementsLeft = new SharedBool(false);
595 m_sourceDepleted = new SharedBool(false);
596 m_sharedLock = new object();
597 m_useSingleChunking = useSingleChunking;
599 // Only allocate the fill-buffer if single-chunking is not in effect
600 if (!m_useSingleChunking)
602 // Time to allocate the fill buffer which is used to reduce the contention on the shared lock.
603 // First pick the buffer size multiplier. We use 4 for when there are more than 4 cores, and just 1 for below. This is based on empirical evidence.
604 int fillBufferMultiplier = (PlatformHelper.ProcessorCount > 4) ? 4 : 1;
606 // and allocate the fill buffer using these two numbers
607 m_FillBuffer = new KeyValuePair<long, TSource>[fillBufferMultiplier * Partitioner.GetDefaultChunkSize<TSource>()];
610 if (isStaticPartitioning)
612 // If this object is created for static partitioning (ie. via GetPartitions(int partitionCount),
613 // GetOrderablePartitions(int partitionCount)), we track the active partitions, in order to dispose
614 // this object when all the partitions have been disposed.
615 m_activePartitionCount = new SharedInt(0);
619 // Otherwise this object is created for dynamic partitioning (ie, via GetDynamicPartitions(),
620 // GetOrderableDynamicPartitions()), we do not need tracking. This object must be disposed
622 m_activePartitionCount = null;
626 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
630 throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
634 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex,
635 m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_useSingleChunking);
640 IEnumerator IEnumerable.GetEnumerator()
642 return ((InternalPartitionEnumerable)this).GetEnumerator();
648 // Used by GrabChunk_Buffered()
649 private void TryCopyFromFillBuffer(KeyValuePair<long, TSource>[] destArray,
650 int requestedChunkSize,
651 ref int actualNumElementsGrabbed)
653 actualNumElementsGrabbed = 0;
656 // making a local defensive copy of the fill buffer reference, just in case it gets nulled out
657 KeyValuePair<long, TSource>[] fillBufferLocalRef = m_FillBuffer;
658 if (fillBufferLocalRef == null) return;
660 // first do a quick check, and give up if the current position is at the end
661 // so that we don't do an unncessary pair of Interlocked.Increment / Decrement calls
662 if (m_FillBufferCurrentPosition >= m_FillBufferSize)
664 return; // no elements in the buffer to copy from
667 // We might have a chance to grab elements from the buffer. We will know for sure
668 // when we do the Interlocked.Add below.
669 // But first we must register as a potential copier in order to make sure
670 // the elements we're copying from don't get overwritten by another thread
671 // that starts refilling the buffer right after our Interlocked.Add.
672 Interlocked.Increment(ref m_activeCopiers);
674 int endPos = Interlocked.Add(ref m_FillBufferCurrentPosition, requestedChunkSize);
675 int beginPos = endPos - requestedChunkSize;
677 if (beginPos < m_FillBufferSize)
679 // adjust index and do the actual copy
680 actualNumElementsGrabbed = (endPos < m_FillBufferSize) ? endPos : m_FillBufferSize - beginPos;
681 Array.Copy(fillBufferLocalRef, beginPos, destArray, 0, actualNumElementsGrabbed);
684 // let the record show we are no longer accessing the buffer
685 Interlocked.Decrement(ref m_activeCopiers);
689 /// This is the common entry point for consuming items from the source enumerable
692 /// true if we successfully reserved at least one element
693 /// false if all elements in the source collection have been reserved.
695 internal bool GrabChunk(KeyValuePair<long, TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
697 actualNumElementsGrabbed = 0;
699 if (m_hasNoElementsLeft.Value)
704 if (m_useSingleChunking)
706 return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
710 return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
715 /// Version of GrabChunk that grabs a single element at a time from the source enumerable
718 /// true if we successfully reserved an element
719 /// false if all elements in the source collection have been reserved.
721 internal bool GrabChunk_Single(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
723 Contract.Assert(m_useSingleChunking, "Expected m_useSingleChecking to be true");
724 Contract.Assert(requestedChunkSize == 1, "Got requested chunk size of " + requestedChunkSize + " when single-chunking was on");
725 Contract.Assert(actualNumElementsGrabbed == 0, "Expected actualNumElementsGrabbed == 0, instead it is " + actualNumElementsGrabbed);
726 Contract.Assert(destArray.Length == 1, "Expected destArray to be of length 1, instead its length is " + destArray.Length);
730 if (m_hasNoElementsLeft.Value) return false;
734 if (m_sharedReader.MoveNext())
736 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
738 = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
739 m_sharedReader.Current);
740 actualNumElementsGrabbed = 1;
745 //if MoveNext() return false, we set the flag to inform other partitions
746 m_sourceDepleted.Value = true;
747 m_hasNoElementsLeft.Value = true;
753 // On an exception, make sure that no additional items are hereafter enumerated
754 m_sourceDepleted.Value = true;
755 m_hasNoElementsLeft.Value = true;
764 /// Version of GrabChunk that uses buffering scheme to grab items out of source enumerable
767 /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
768 /// false if all elements in the source collection have been reserved.
770 internal bool GrabChunk_Buffered(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
772 Contract.Assert(requestedChunkSize > 0);
773 Contract.Assert(!m_useSingleChunking, "Did not expect to be in single-chunking mode");
775 TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
777 if (actualNumElementsGrabbed == requestedChunkSize)
782 else if (m_sourceDepleted.Value)
784 // looks like we both reached the end of the fill buffer, and the source was depleted previously
785 // this means no more work to do for any other worker
786 m_hasNoElementsLeft.Value = true;
788 return (actualNumElementsGrabbed > 0);
793 // now's the time to take the shared lock and enumerate
797 if (m_sourceDepleted.Value)
799 return (actualNumElementsGrabbed > 0);
804 // we need to make sure all array copiers are finished
805 if (m_activeCopiers > 0)
807 SpinWait sw = new SpinWait();
808 while( m_activeCopiers > 0) sw.SpinOnce();
811 Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk
813 // Now's the time to actually enumerate the source
815 // We first fill up the requested # of elements in the caller's array
816 // continue from the where TryCopyFromFillBuffer() left off
817 for (; actualNumElementsGrabbed < requestedChunkSize; actualNumElementsGrabbed++)
819 if (m_sharedReader.MoveNext())
821 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
822 destArray[actualNumElementsGrabbed]
823 = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
824 m_sharedReader.Current);
828 //if MoveNext() return false, we set the flag to inform other partitions
829 m_sourceDepleted.Value = true;
834 // taking a local snapshot of m_FillBuffer in case some other thread decides to null out m_FillBuffer
835 // in the entry of this method after observing m_sourceCompleted = true
836 var localFillBufferRef = m_FillBuffer;
838 // If the big buffer seems to be depleted, we will also fill that up while we are under the lock
839 // Note that this is the only place that m_FillBufferCurrentPosition can be reset
840 if (m_sourceDepleted.Value == false && localFillBufferRef != null &&
841 m_FillBufferCurrentPosition >= localFillBufferRef.Length)
843 for (int i = 0; i < localFillBufferRef.Length; i++)
845 if( m_sharedReader.MoveNext())
847 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
848 localFillBufferRef[i]
849 = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
850 m_sharedReader.Current);
854 // No more elements left in the enumerator.
855 // Record this, so that the next request can skip the lock
856 m_sourceDepleted.Value = true;
858 // also record the current count in m_FillBufferSize
859 m_FillBufferSize = i;
861 // and exit the for loop so that we don't keep incrementing m_FillBufferSize
867 m_FillBufferCurrentPosition = 0;
874 // If an exception occurs, don't let the other enumerators try to enumerate.
875 // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected
876 // and not helpful to the end user. We know the root cause is being communicated already.)
877 m_sourceDepleted.Value = true;
878 m_hasNoElementsLeft.Value = true;
883 return (actualNumElementsGrabbed > 0);
886 public void Dispose()
891 m_sharedReader.Dispose();
897 /// Inherits from DynamicPartitionEnumerator_Abstract directly
898 /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose
900 private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract<TSource, IEnumerator<TSource>>
903 //cached local copy of the current chunk
904 private KeyValuePair<long, TSource>[] m_localList; //defer allocating to avoid false sharing
906 // the values of the following two fields are passed in from
907 // outside(already initialized) by the constructor,
908 private readonly SharedBool m_hasNoElementsLeft;
909 #pragma warning disable 414
910 private readonly object m_sharedLock;
911 #pragma warning restore 414
912 private readonly SharedInt m_activePartitionCount;
913 private InternalPartitionEnumerable m_enumerable;
916 internal InternalPartitionEnumerator(
917 IEnumerator<TSource> sharedReader,
918 SharedLong sharedIndex,
919 SharedBool hasNoElementsLeft,
921 SharedInt activePartitionCount,
922 InternalPartitionEnumerable enumerable,
923 bool useSingleChunking)
924 : base(sharedReader, sharedIndex, useSingleChunking)
926 m_hasNoElementsLeft = hasNoElementsLeft;
927 m_sharedLock = sharedLock;
928 m_enumerable = enumerable;
929 m_activePartitionCount = activePartitionCount;
931 if (m_activePartitionCount != null)
933 // If static partitioning, we need to increase the active partition count.
934 Interlocked.Increment(ref m_activePartitionCount.Value);
941 /// Reserves a contiguous range of elements from source data
943 /// <param name="requestedChunkSize">specified number of elements requested</param>
945 /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
946 /// false if all elements in the source collection have been reserved.
948 override protected bool GrabNextChunk(int requestedChunkSize)
950 Contract.Assert(requestedChunkSize > 0);
952 if (HasNoElementsLeft)
957 // defer allocation to avoid false sharing
958 if (m_localList == null)
960 m_localList = new KeyValuePair<long, TSource>[m_maxChunkSize];
963 // make the actual call to the enumerable that grabs a chunk
964 return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value);
968 /// Returns whether or not the shared reader has already read the last
969 /// element of the source data
972 /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
973 /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared
974 /// boolean value m_hasNoElementsLeft across all partitions
976 override protected bool HasNoElementsLeft
978 get { return m_hasNoElementsLeft.Value; }
981 //we only set it from false to true once
982 //we should never set it back in any circumstances
983 Contract.Assert(value);
984 Contract.Assert(!m_hasNoElementsLeft.Value);
985 m_hasNoElementsLeft.Value = true;
989 override public KeyValuePair<long, TSource> Current
993 //verify that MoveNext is at least called once before Current is called
994 if (m_currentChunkSize == null)
996 throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
998 Contract.Assert(m_localList != null);
999 Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1000 return (m_localList[m_localOffset.Value]);
1004 override public void Dispose()
1006 // If this is static partitioning, ie. m_activePartitionCount != null, since the current partition
1007 // is disposed, we decrement the number of active partitions for the shared reader.
1008 if (m_activePartitionCount != null && Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
1010 // If the number of active partitions becomes 0, we need to dispose the shared
1011 // reader we created in the m_enumerable object.
1012 m_enumerable.Dispose();
1014 // If this is dynamic partitioning, ie. m_activePartitionCount != null, then m_enumerable needs to
1015 // be disposed explicitly by the user, and we do not need to anything here
1023 #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>)
1025 /// Dynamic load-balance partitioner. This class is abstract and to be derived from by
1026 /// the customized partitioner classes for IList, Array, and IEnumerable
1028 /// <typeparam name="TSource">Type of the elements in the source data</typeparam>
1029 /// <typeparam name="TCollection"> Type of the source data collection</typeparam>
1030 private abstract class DynamicPartitionerForIndexRange_Abstract<TSource, TCollection> : OrderablePartitioner<TSource>
1032 // TCollection can be: IList<TSource>, TSource[] and IEnumerable<TSource>
1033 // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
1037 /// Constructs a new orderable partitioner
1039 /// <param name="data">source data collection</param>
1040 protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
1041 : base(true, false, true)
1047 /// Partition the source data and create an enumerable over the resulting partitions.
1049 /// <param name="data">the source data collection</param>
1050 /// <returns>an enumerable of partitions of </returns>
1051 protected abstract IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TCollection data);
1054 /// Overrides OrderablePartitioner.GetOrderablePartitions.
1055 /// Partitions the underlying collection into the given number of orderable partitions.
1057 /// <param name="partitionCount">number of partitions requested</param>
1058 /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
1059 override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
1061 if (partitionCount <= 0)
1063 throw new ArgumentOutOfRangeException("partitionCount");
1065 IEnumerator<KeyValuePair<long, TSource>>[] partitions
1066 = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
1067 IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data);
1068 for (int i = 0; i < partitionCount; i++)
1070 partitions[i] = partitionEnumerable.GetEnumerator();
1076 /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
1078 /// <returns>a enumerable collection of orderable partitions</returns>
1079 override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
1081 return GetOrderableDynamicPartitions_Factory(m_data);
1085 /// Whether additional partitions can be created dynamically.
1087 override public bool SupportsDynamicPartitions
1089 get { return true; }
1095 /// Defines dynamic partition for source data of IList and Array.
1096 /// This class inherits DynamicPartitionEnumerator_Abstract
1097 /// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array
1098 /// - Current property still remains abstract, implementation is different for IList and Array
1099 /// - introduces another abstract method SourceCount, which returns the number of elements in
1100 /// the source data. Implementation differs for IList and Array
1102 /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
1103 /// <typeparam name="TSourceReader">Type of the reader on the source data</typeparam>
1104 private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSourceReader> : DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>
1107 protected int m_startIndex; //initially zero
1110 protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
1111 : base(sharedReader, sharedIndex)
1116 //the Current property is still abstract, and will be implemented by derived classes
1117 //we add another abstract method SourceCount to get the number of elements from the source reader
1120 /// Get the number of elements from the source reader.
1121 /// It calls IList.Count or Array.Length
1123 protected abstract int SourceCount { get; }
1125 //overriding methods
1128 /// Reserves a contiguous range of elements from source data
1130 /// <param name="requestedChunkSize">specified number of elements requested</param>
1132 /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
1133 /// false if all elements in the source collection have been reserved.
1135 override protected bool GrabNextChunk(int requestedChunkSize)
1137 Contract.Assert(requestedChunkSize > 0);
1139 while (!HasNoElementsLeft)
1141 Contract.Assert(m_sharedIndex != null);
1142 // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
1143 long oldSharedIndex = Volatile.Read(ref m_sharedIndex.Value);
1145 if (HasNoElementsLeft)
1147 //HasNoElementsLeft situation changed from false to true immediately
1148 //and oldSharedIndex becomes stale
1152 //there won't be overflow, because the index of IList/array is int, and we
1153 //have casted it to long.
1154 long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);
1157 //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex]
1158 //inclusive in the source collection
1159 if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex)
1162 //set up local indexes.
1163 //m_currentChunkSize is always set to requestedChunkSize when source data had
1164 //enough elements of what we requested
1165 m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex);
1166 m_localOffset.Value = -1;
1167 m_startIndex = (int)(oldSharedIndex + 1);
1171 //didn't get any element, return false;
1176 /// Returns whether or not the shared reader has already read the last
1177 /// element of the source data
1179 override protected bool HasNoElementsLeft
1183 Contract.Assert(m_sharedIndex != null);
1184 // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
1185 return Volatile.Read(ref m_sharedIndex.Value) >= SourceCount - 1;
1189 Contract.Assert(false);
1194 /// For source data type IList and Array, the type of the shared reader is just the data itself.
1195 /// We don't do anything in Dispose method for IList and Array.
1197 override public void Dispose()
1203 /// Inherits from DynamicPartitioners
1204 /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
1205 /// of EnumerableOfPartitionsForIList defined internally
1207 /// <typeparam name="TSource">Type of elements in the source data</typeparam>
1208 private class DynamicPartitionerForIList<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, IList<TSource>>
1211 internal DynamicPartitionerForIList(IList<TSource> source)
1216 override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(IList<TSource> m_data)
1218 //m_data itself serves as shared reader
1219 return new InternalPartitionEnumerable(m_data);
1223 /// Inherits from PartitionList_Abstract
1224 /// Provides customized implementation for source data of IList
1226 private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
1228 //reader through which we access the source data
1229 private readonly IList<TSource> m_sharedReader;
1230 private SharedLong m_sharedIndex;
1232 internal InternalPartitionEnumerable(IList<TSource> sharedReader)
1234 m_sharedReader = sharedReader;
1235 m_sharedIndex = new SharedLong(-1);
1238 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
1240 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
1243 IEnumerator IEnumerable.GetEnumerator()
1245 return ((InternalPartitionEnumerable)this).GetEnumerator();
1250 /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
1251 /// Provides customized implementation of SourceCount property and Current property for IList
1253 private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, IList<TSource>>
1256 internal InternalPartitionEnumerator(IList<TSource> sharedReader, SharedLong sharedIndex)
1257 : base(sharedReader, sharedIndex)
1260 //overriding methods
1261 override protected int SourceCount
1263 get { return m_sharedReader.Count; }
1266 /// return a KeyValuePair of the current element and its key
1268 override public KeyValuePair<long, TSource> Current
1272 //verify that MoveNext is at least called once before Current is called
1273 if (m_currentChunkSize == null)
1275 throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1278 Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1279 return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
1280 m_sharedReader[m_startIndex + m_localOffset.Value]);
1289 /// Inherits from DynamicPartitioners
1290 /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
1291 /// of EnumerableOfPartitionsForArray defined internally
1293 /// <typeparam name="TSource">Type of elements in the source data</typeparam>
1294 private class DynamicPartitionerForArray<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, TSource[]>
1297 internal DynamicPartitionerForArray(TSource[] source)
1302 override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
1304 return new InternalPartitionEnumerable(m_data);
1308 /// Inherits from PartitionList_Abstract
1309 /// Provides customized implementation for source data of Array
1311 private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
1313 //reader through which we access the source data
1314 private readonly TSource[] m_sharedReader;
1315 private SharedLong m_sharedIndex;
1317 internal InternalPartitionEnumerable(TSource[] sharedReader)
1319 m_sharedReader = sharedReader;
1320 m_sharedIndex = new SharedLong(-1);
1323 IEnumerator IEnumerable.GetEnumerator()
1325 return ((InternalPartitionEnumerable)this).GetEnumerator();
1329 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
1331 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
1336 /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
1337 /// Provides customized implementation of SourceCount property and Current property for Array
1339 private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSource[]>
1342 internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex)
1343 : base(sharedReader, sharedIndex)
1346 //overriding methods
1347 override protected int SourceCount
1349 get { return m_sharedReader.Length; }
1352 override public KeyValuePair<long, TSource> Current
1356 //verify that MoveNext is at least called once before Current is called
1357 if (m_currentChunkSize == null)
1359 throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1362 Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
1363 return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
1364 m_sharedReader[m_startIndex + m_localOffset.Value]);
1372 #region Static partitioning for IList and Array, abstract classes
1374 /// Static partitioning over IList.
1375 /// - dynamic and load-balance
1376 /// - Keys are ordered within each partition
1377 /// - Keys are ordered across partitions
1378 /// - Keys are normalized
1379 /// - Number of partitions is fixed once specified, and the elements of the source data are
1380 /// distributed to each partition as evenly as possible.
1382 /// <typeparam name="TSource">type of the elements</typeparam>
1383 /// <typeparam name="TCollection">Type of the source data collection</typeparam>
1384 private abstract class StaticIndexRangePartitioner<TSource, TCollection> : OrderablePartitioner<TSource>
1386 protected StaticIndexRangePartitioner()
1387 : base(true, true, true)
1391 /// Abstract method to return the number of elements in the source data
1393 protected abstract int SourceCount { get; }
1396 /// Abstract method to create a partition that covers a range over source data,
1397 /// starting from "startIndex", ending at "endIndex"
1399 /// <param name="startIndex">start index of the current partition on the source data</param>
1400 /// <param name="endIndex">end index of the current partition on the source data</param>
1401 /// <returns>a partition enumerator over the specified range</returns>
1402 // The partitioning algorithm is implemented in GetOrderablePartitions method
1403 // This method delegates according to source data type IList/Array
1404 protected abstract IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex);
1407 /// Overrides OrderablePartitioner.GetOrderablePartitions
1408 /// Return a list of partitions, each of which enumerate a fixed part of the source data
1409 /// The elements of the source data are distributed to each partition as evenly as possible.
1410 /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b,
1411 /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements,
1412 /// and the last x-b partitions each has a elements.
1413 /// For example, if N=10, x =3, then
1414 /// partition 0 ranges [0,3],
1415 /// partition 1 ranges [4,6],
1416 /// partition 2 ranges [7,9].
1417 /// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators.
1418 /// An empty enumerator is indicated by
1419 /// (m_startIndex == list.Count && m_endIndex == list.Count -1)
1421 /// <param name="partitionCount">specified number of partitions</param>
1422 /// <returns>a list of partitions</returns>
1423 override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
1425 if (partitionCount <= 0)
1427 throw new ArgumentOutOfRangeException("partitionCount");
1430 int quotient, remainder;
1431 quotient = Math.DivRem(SourceCount, partitionCount, out remainder);
1433 IEnumerator<KeyValuePair<long, TSource>>[] partitions = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
1434 int lastEndIndex = -1;
1435 for (int i = 0; i < partitionCount; i++)
1437 int startIndex = lastEndIndex + 1;
1441 lastEndIndex = startIndex + quotient;
1445 lastEndIndex = startIndex + quotient - 1;
1447 partitions[i] = CreatePartition(startIndex, lastEndIndex);
1454 /// Static Partition for IList/Array.
1455 /// This class implements all methods required by IEnumerator interface, except for the Current property.
1456 /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element
1459 //We assume the source collection is not being updated concurrently. Otherwise it will break the
1460 //static partitioning, since each partition operates on the source collection directly, it does
1461 //not have a local cache of the elements assigned to them.
1462 private abstract class StaticIndexRangePartition<TSource> : IEnumerator<KeyValuePair<long, TSource>>
1464 //the start and end position in the source collection for the current partition
1465 //the partition is empty if and only if
1466 // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1)
1467 protected readonly int m_startIndex;
1468 protected readonly int m_endIndex;
1470 //the current index of the current partition while enumerating on the source collection
1471 protected volatile int m_offset;
1474 /// Constructs an instance of StaticIndexRangePartition
1476 /// <param name="startIndex">the start index in the source collection for the current partition </param>
1477 /// <param name="endIndex">the end index in the source collection for the current partition</param>
1478 protected StaticIndexRangePartition(int startIndex, int endIndex)
1480 m_startIndex = startIndex;
1481 m_endIndex = endIndex;
1482 m_offset = startIndex - 1;
1486 /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster
1487 /// element retrieval.
1489 public abstract KeyValuePair<long, TSource> Current { get; }
1492 /// We don't dispose the source for IList and array
1494 public void Dispose()
1499 throw new NotSupportedException();
1503 /// Moves to the next item
1504 /// Before the first MoveNext is called: m_offset == m_startIndex-1;
1506 /// <returns>true if successful, false if there is no item left</returns>
1507 public bool MoveNext()
1509 if (m_offset < m_endIndex)
1516 //After we have enumerated over all elements, we set m_offset to m_endIndex +1.
1517 //The reason we do this is, for an empty enumerator, we need to tell the Current
1518 //property whether MoveNext has been called or not.
1519 //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex),
1520 //and we don't set a new value to m_offset, then the above condition will always be
1521 //true, and the Current property will mistakenly assume MoveNext is never called.
1522 m_offset = m_endIndex + 1;
1527 Object IEnumerator.Current
1531 return ((StaticIndexRangePartition<TSource>)this).Current;
1537 #region Static partitioning for IList
1539 /// Inherits from StaticIndexRangePartitioner
1540 /// Provides customized implementation of SourceCount and CreatePartition
1542 /// <typeparam name="TSource"></typeparam>
1543 private class StaticIndexRangePartitionerForIList<TSource> : StaticIndexRangePartitioner<TSource, IList<TSource>>
1545 IList<TSource> m_list;
1546 internal StaticIndexRangePartitionerForIList(IList<TSource> list)
1549 Contract.Assert(list != null);
1552 override protected int SourceCount
1554 get { return m_list.Count; }
1556 override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
1558 return new StaticIndexRangePartitionForIList<TSource>(m_list, startIndex, endIndex);
1563 /// Inherits from StaticIndexRangePartition
1564 /// Provides customized implementation of Current property
1566 /// <typeparam name="TSource"></typeparam>
1567 private class StaticIndexRangePartitionForIList<TSource> : StaticIndexRangePartition<TSource>
1569 //the source collection shared by all partitions
1570 private volatile IList<TSource> m_list;
1572 internal StaticIndexRangePartitionForIList(IList<TSource> list, int startIndex, int endIndex)
1573 : base(startIndex, endIndex)
1575 Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1);
1579 override public KeyValuePair<long, TSource> Current
1583 //verify that MoveNext is at least called once before Current is called
1584 if (m_offset < m_startIndex)
1586 throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1589 Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
1590 return (new KeyValuePair<long, TSource>(m_offset, m_list[m_offset]));
1596 #region static partitioning for Arrays
1598 /// Inherits from StaticIndexRangePartitioner
1599 /// Provides customized implementation of SourceCount and CreatePartition for Array
1601 private class StaticIndexRangePartitionerForArray<TSource> : StaticIndexRangePartitioner<TSource, TSource[]>
1604 internal StaticIndexRangePartitionerForArray(TSource[] array)
1607 Contract.Assert(array != null);
1610 override protected int SourceCount
1612 get { return m_array.Length; }
1614 override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
1616 return new StaticIndexRangePartitionForArray<TSource>(m_array, startIndex, endIndex);
1621 /// Inherits from StaticIndexRangePartitioner
1622 /// Provides customized implementation of SourceCount and CreatePartition
1624 private class StaticIndexRangePartitionForArray<TSource> : StaticIndexRangePartition<TSource>
1626 //the source collection shared by all partitions
1627 private volatile TSource[] m_array;
1629 internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex)
1630 : base(startIndex, endIndex)
1632 Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1);
1636 override public KeyValuePair<long, TSource> Current
1640 //verify that MoveNext is at least called once before Current is called
1641 if (m_offset < m_startIndex)
1643 throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
1646 Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
1647 return (new KeyValuePair<long, TSource>(m_offset, m_array[m_offset]));
1654 #region Utility functions
1656 /// A very simple primitive that allows us to share a value across multiple threads.
1658 /// <typeparam name="TSource"></typeparam>
1659 private class SharedInt
1661 internal volatile int Value;
1663 internal SharedInt(int value)
1671 /// A very simple primitive that allows us to share a value across multiple threads.
1673 private class SharedBool
1675 internal volatile bool Value;
1677 internal SharedBool(bool value)
1685 /// A very simple primitive that allows us to share a value across multiple threads.
1687 private class SharedLong
1689 internal long Value;
1690 internal SharedLong(long value)
1697 //--------------------
1698 // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling,
1699 // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling
1700 //--------------------
1702 // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache
1703 // lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines,
1704 // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient
1705 // for 128b cache systems. So 128b it is.
1706 private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4;
1708 private static int GetDefaultChunkSize<TSource>()
1712 if (typeof(TSource).IsValueType)
1714 #if !FEATURE_CORECLR // Marshal.SizeOf is not supported in CoreCLR
1717 if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit)
1719 chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource)));
1723 // We choose '128' because this ensures, no matter the actual size of the value type,
1724 // the total bytes used will be a multiple of 128. This ensures it's cache aligned.
1733 Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size");
1734 chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size);