3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ParallelRangeManager.cs
10 // <OWNER>Microsoft</OWNER>
12 // Implements the algorithm for distributing loop indices to parallel loop workers
14 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
17 using System.Threading;
18 using System.Diagnostics.Contracts;
20 #pragma warning disable 0420
22 namespace System.Threading.Tasks
25 /// Represents an index range
27 internal struct IndexRange
29 // the From and To values for this range. These do not change.
30 internal long m_nFromInclusive;
31 internal long m_nToExclusive;
33 // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual
34 // value saves us from overflows that can happen due to multiple workers racing to increment this.
35 // All updates to this field need to be interlocked.
36 internal volatile Shared<long> m_nSharedCurrentIndexOffset;
38 // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here.
39 internal int m_bRangeFinished;
44 /// The RangeWorker struct wraps the state needed by a task that services the parallel loop
46 internal struct RangeWorker
48 // reference to the IndexRange array allocated by the range manager
49 internal readonly IndexRange[] m_indexRanges;
51 // index of the current index range that this worker is grabbing chunks from
52 internal int m_nCurrentIndexRange;
54 // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
55 internal long m_nStep;
57 // increment value is the current amount that this worker will use
58 // to increment the shared index of the range it's working on
59 internal long m_nIncrementValue;
61 // the increment value is doubled each time this worker finds work, and is capped at this value
62 internal readonly long m_nMaxIncrementValue;
65 /// Initializes a RangeWorker struct
67 internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep)
69 m_indexRanges = ranges;
70 m_nCurrentIndexRange = nInitialRange;
73 m_nIncrementValue = nStep;
75 m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
79 /// Implements the core work search algorithm that will be used for this range worker.
83 /// 1) the thread associated with this rangeworker calls FindNewWork
84 /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
85 /// to execute the sequential loop
86 /// 3) if we return false it means there is no more work left. It's time to quit.
88 internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
90 // since we iterate over index ranges circularly, we will use the
91 // count of visited ranges as our exit condition
92 int numIndexRangesToVisit = m_indexRanges.Length;
96 // local snap to save array access bounds checks in places where we only read fields
97 IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange];
99 if (currentRange.m_bRangeFinished == 0)
101 if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null)
103 Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared<long>(0), null);
106 // this access needs to be on the array slot
107 long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value,
108 m_nIncrementValue) - m_nIncrementValue;
110 if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset)
114 nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset;
115 nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue;
117 // Check for going past end of range, or wrapping
118 if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) )
120 nToExclusiveLocal = currentRange.m_nToExclusive;
123 // We will double our unit of increment until it reaches the maximum.
124 if (m_nIncrementValue < m_nMaxIncrementValue)
126 m_nIncrementValue *= 2;
127 if (m_nIncrementValue > m_nMaxIncrementValue)
129 m_nIncrementValue = m_nMaxIncrementValue;
137 // this index range is completed, mark it so that others can skip it quickly
138 Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1);
142 // move on to the next index range, in circular order.
143 m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length;
144 numIndexRangesToVisit--;
146 } while (numIndexRangesToVisit > 0);
147 // we've visited all index ranges possible => there's no work remaining
149 nFromInclusiveLocal = 0;
150 nToExclusiveLocal = 0;
157 /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values.
159 internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32)
161 long nFromInclusiveLocal;
162 long nToExclusiveLocal;
164 bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal);
166 Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) &&
167 (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue));
169 // convert to 32 bit before returning
170 nFromInclusiveLocal32 = (int)nFromInclusiveLocal;
171 nToExclusiveLocal32 = (int)nToExclusiveLocal;
179 /// Represents the entire loop operation, keeping track of workers and ranges.
182 /// The usage pattern is:
183 /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class
184 /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a
185 /// RangeWorker struct to wrap the state it will need to find and execute work,
186 /// and they keep interacting with that struct until the end of the loop
187 internal class RangeManager
189 internal readonly IndexRange[] m_indexRanges;
191 internal int m_nCurrentIndexRangeToAssign;
192 internal long m_nStep;
195 /// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
197 internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
199 m_nCurrentIndexRangeToAssign = 0;
202 // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2.
203 if (nNumExpectedWorkers == 1)
204 nNumExpectedWorkers = 2;
207 // calculate the size of each index range
210 ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
211 ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first
213 uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep
214 // otherwise index range transitions will derail us from nStep
218 uRangeSize = (ulong) nStep;
222 // find the actual number of index ranges we will need
224 Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue);
226 int nNumRanges = (int)(uSpan / uRangeSize);
228 if (uSpan % uRangeSize != 0)
234 // Convert to signed so the rest of the logic works.
235 // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2.
236 long nRangeSize = (long)uRangeSize;
238 // allocate the array of index ranges
239 m_indexRanges = new IndexRange[nNumRanges];
241 long nCurrentIndex = nFromInclusive;
242 for (int i = 0; i < nNumRanges; i++)
244 // the fromInclusive of the new index range is always on nCurrentIndex
245 m_indexRanges[i].m_nFromInclusive = nCurrentIndex;
246 m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
247 m_indexRanges[i].m_bRangeFinished = 0;
249 // now increment it to find the toExclusive value for our range
250 nCurrentIndex += nRangeSize;
252 // detect integer overflow or range overage and snap to nToExclusive
253 if (nCurrentIndex < nCurrentIndex - nRangeSize ||
254 nCurrentIndex > nToExclusive)
256 // this should only happen at the last index
257 Contract.Assert(i == nNumRanges - 1);
259 nCurrentIndex = nToExclusive;
262 // now that the end point of the new range is calculated, assign it.
263 m_indexRanges[i].m_nToExclusive = nCurrentIndex;
268 /// The function that needs to be called by each new worker thread servicing the parallel loop
269 /// in order to get a RangeWorker struct that wraps the state for finding and executing indices
271 internal RangeWorker RegisterNewWorker()
273 Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0);
275 int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length;
277 return new RangeWorker(m_indexRanges, nInitialRange, m_nStep);
281 #pragma warning restore 0420