-#if NET_4_0
// Parallel.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
//
+#if NET_4_0
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
{
return GetBestWorkerNumber (TaskScheduler.Current);
}
-
+
internal static int GetBestWorkerNumber (TaskScheduler scheduler)
- {
+ {
return scheduler.MaximumConcurrencyLevel;
}
-
+
static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
{
int num = Math.Min (GetBestWorkerNumber (),
options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
// Integer range that each task process
- step = Math.Min (5, (to - from) / num);
- if (step <= 0)
- step = 1;
-
+ if ((step = (to - from) / num) < 5) {
+ step = 5;
+ num = (to - from) / 5;
+ if (num < 1)
+ num = 1;
+ }
+
return num;
}
-
+
static void HandleExceptions (IEnumerable<Task> tasks)
{
HandleExceptions (tasks, null);
}
-
+
static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
{
List<Exception> exs = new List<Exception> ();
if (t.Exception != null)
exs.Add (t.Exception);
}
-
+
if (exs.Count > 0) {
if (infos != null)
infos.IsExceptional = true;
-
+
throw new AggregateException (exs);
}
}
-
+
static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
{
- TaskCreationOptions creation = TaskCreationOptions.LongRunning;
-
+ TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
+
for (int i = 0; i < count; i++) {
if (options == null)
tasks [i] = Task.Factory.StartNew (action, creation);
tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
}
}
- #region For
-
- public static ParallelLoopResult For (int from, int to, Action<int> action)
+
+#region For
+
+ public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
{
- return For (from, to, null, action);
+ return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
}
-
- public static ParallelLoopResult For (int from, int to, Action<int, ParallelLoopState> action)
+
+ public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
{
- return For (from, to, null, action);
+ return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
}
-
- public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int> action)
+
+ public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
{
- return For (from, to, options, (index, state) => action (index));
+ return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
}
-
- public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int, ParallelLoopState> action)
+
+ public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
{
- return For<object> (from, to, options, null, (i, s, l) => { action (i, s); return null; }, null);
+ return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
}
-
- public static ParallelLoopResult For<TLocal> (int from, int to, Func<TLocal> init,
- Func<int, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
+
+ public static ParallelLoopResult For<TLocal> (int fromInclusive,
+ int toExclusive,
+ Func<TLocal> localInit,
+ Func<int, ParallelLoopState, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
{
- return For<TLocal> (from, to, null, init, action, destruct);
+ return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
}
-
-
- public static ParallelLoopResult For<TLocal> (int from, int to, ParallelOptions options,
- Func<TLocal> init,
- Func<int, ParallelLoopState, TLocal, TLocal> action,
- Action<TLocal> destruct)
- {
- if (action == null)
- throw new ArgumentNullException ("action");
-
- // Number of task to be launched (normally == Env.ProcessorCount)
+
+ public static ParallelLoopResult For<TLocal> (int fromInclusive,
+ int toExclusive,
+ ParallelOptions parallelOptions,
+ Func<TLocal> localInit,
+ Func<int, ParallelLoopState, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
+ {
+ if (body == null)
+ throw new ArgumentNullException ("body");
+ if (localInit == null)
+ throw new ArgumentNullException ("localInit");
+ if (localFinally == null)
+ throw new ArgumentNullException ("localFinally");
+ if (parallelOptions == null)
+ throw new ArgumentNullException ("options");
+ if (fromInclusive >= toExclusive)
+ return new ParallelLoopResult (null, true);
+
+ // Number of task toExclusive be launched (normally == Env.ProcessorCount)
int step;
- int num = GetBestWorkerNumber (from, to, options, out step);
+ int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
- // Each worker put the indexes it's responsible for here
- // so that other worker may steal if they starve.
- SimpleConcurrentBag<int> bag = new SimpleConcurrentBag<int> (num);
Task[] tasks = new Task [num];
+
+ StealRange[] ranges = new StealRange[num];
+ for (int i = 0; i < num; i++)
+ ranges[i] = new StealRange (fromInclusive, i, step);
+
ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
-
- Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
- if (options != null && options.CancellationToken.IsCancellationRequested) {
- s.Stop ();
- return true;
- }
- return false;
- };
-
- Func<int, bool> breakTest = (i) => infos.LowestBreakIteration != null && infos.LowestBreakIteration > i;
-
- int currentIndex = from;
-
+
+ int currentIndex = -1;
+
Action workerMethod = delegate {
- int index, actual;
- TLocal local = (init == null) ? default (TLocal) : init ();
-
+ int localWorker = Interlocked.Increment (ref currentIndex);
+ StealRange range = ranges[localWorker];
+ int index = range.Actual;
+ int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
+ TLocal local = localInit ();
+
ParallelLoopState state = new ParallelLoopState (infos);
- int workIndex = bag.GetNextIndex ();
-
+ CancellationToken token = parallelOptions.CancellationToken;
+
try {
- while (currentIndex < to && (index = Interlocked.Add (ref currentIndex, step) - step) < to) {
- if (infos.IsStopped.Value)
+ for (int i = index; i < stopIndex; range.Actual = ++i) {
+ if (infos.IsStopped)
return;
-
- if (cancellationTokenTest (state))
+
+ token.ThrowIfCancellationRequested ();
+
+ if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
return;
-
- for (int i = index; i < to && i < index + step; i++)
- bag.Add (workIndex, i);
-
- for (int i = index; i < to && i < index + step && bag.TryTake (workIndex, out actual); i++) {
- if (infos.IsStopped.Value)
- return;
-
- if (cancellationTokenTest (state))
- return;
-
- if (breakTest (actual))
- return;
-
- state.CurrentIteration = actual;
- local = action (actual, state, local);
- }
+
+ state.CurrentIteration = i;
+ local = body (i, state, local);
+ if (i >= stopIndex - range.Stolen)
+ break;
}
-
- while (bag.TrySteal (workIndex, out actual)) {
- if (infos.IsStopped.Value)
- return;
-
- if (cancellationTokenTest (state))
- return;
-
- if (breakTest (actual))
- continue;
-
- state.CurrentIteration = actual;
- local = action (actual, state, local);
+
+ // Try toExclusive steal fromInclusive our right neighbor (cyclic)
+ int len = num + localWorker;
+ for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
+ int extWorker = sIndex % num;
+ range = ranges[extWorker];
+
+ stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
+
+ int stolen;
+ do {
+ stolen = range.Stolen;
+ if (stopIndex - stolen > range.Actual)
+ goto next;
+ } while (Interlocked.CompareExchange (ref range.Stolen, stolen + 1, stolen) != stolen);
+
+ stolen = stopIndex - stolen - 1;
+
+ if (stolen > range.Actual)
+ local = body (stolen, state, local);
+
+ next:
+ continue;
}
} finally {
- if (destruct != null)
- destruct (local);
+ localFinally (local);
}
};
- InitTasks (tasks, num, workerMethod, options);
-
+ InitTasks (tasks, num, workerMethod, parallelOptions);
+
try {
Task.WaitAll (tasks);
} catch {
HandleExceptions (tasks, infos);
}
-
- return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
+
+ return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
}
- #endregion
-
- #region Foreach
+ class StealRange
+ {
+ public int Stolen;
+ public int Actual;
+
+ public StealRange (int fromInclusive, int i, int step)
+ {
+ Actual = fromInclusive + i * step;
+ }
+ }
+
+#endregion
+
+#region For (long)
+
+ [MonoTODO]
+ public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
+ {
+ return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
+ }
+
+ [MonoTODO]
+ public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
+ {
+ return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
+ }
+
+ [MonoTODO]
+ public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
+ {
+ return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
+ }
+
+ [MonoTODO]
+ public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
+ {
+ return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
+ }
+
+ [MonoTODO]
+ public static ParallelLoopResult For<TLocal> (long fromInclusive,
+ long toExclusive,
+ Func<TLocal> localInit,
+ Func<long, ParallelLoopState, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
+ {
+ return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
+ }
+
+ [MonoTODO ("See how this can be refactored with the above For implementation")]
+ public static ParallelLoopResult For<TLocal> (long fromInclusive,
+ long toExclusive,
+ ParallelOptions parallelOptions,
+ Func<TLocal> localInit,
+ Func<long, ParallelLoopState, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
+ {
+ if (body == null)
+ throw new ArgumentNullException ("body");
+ if (localInit == null)
+ throw new ArgumentNullException ("localInit");
+ if (localFinally == null)
+ throw new ArgumentNullException ("localFinally");
+ if (parallelOptions == null)
+ throw new ArgumentNullException ("options");
+ if (fromInclusive >= toExclusive)
+ return new ParallelLoopResult (null, true);
+
+ throw new NotImplementedException ();
+ }
+
+#endregion
+
+#region Foreach
static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
Action<TLocal> destruct)
- {
+ {
+ if (enumerable == null)
+ throw new ArgumentNullException ("source");
+ if (options == null)
+ throw new ArgumentNullException ("options");
+ if (action == null)
+ throw new ArgumentNullException ("action");
+ if (init == null)
+ throw new ArgumentNullException ("init");
+ if (destruct == null)
+ throw new ArgumentNullException ("destruct");
+
int num = Math.Min (GetBestWorkerNumber (),
options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
-
+
Task[] tasks = new Task[num];
ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
-
+
SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
const int bagCount = 5;
-
+
IList<IEnumerator<TSource>> slices = enumerable (num);
-
- int sliceIndex = 0;
-
- Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
- if (options != null && options.CancellationToken.IsCancellationRequested) {
- s.Stop ();
- return true;
- }
- return false;
- };
+
+ int sliceIndex = -1;
Action workerMethod = delegate {
- IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex) - 1];
-
- TLocal local = (init != null) ? init () : default (TLocal);
+ IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
+
+ TLocal local = init ();
ParallelLoopState state = new ParallelLoopState (infos);
int workIndex = bag.GetNextIndex ();
-
+ CancellationToken token = options.CancellationToken;
+
try {
bool cont = true;
TSource element;
-
+
while (cont) {
- if (infos.IsStopped.Value)
- return;
-
- if (cancellationTokenTest (state))
+ if (infos.IsStopped || infos.IsBroken.Value)
return;
-
+
+ token.ThrowIfCancellationRequested ();
+
for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
bag.Add (workIndex, slice.Current);
}
-
+
for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
- if (infos.IsStopped.Value)
+ if (infos.IsStopped)
return;
-
- if (cancellationTokenTest (state))
- return;
-
+
+ token.ThrowIfCancellationRequested ();
+
local = action (element, state, local);
}
}
-
+
while (bag.TrySteal (workIndex, out element)) {
- if (infos.IsStopped.Value)
- return;
-
- if (cancellationTokenTest (state))
- return;
-
+ token.ThrowIfCancellationRequested ();
+
local = action (element, state, local);
+
+ if (infos.IsStopped || infos.IsBroken.Value)
+ return;
}
} finally {
- if (destruct != null)
- destruct (local);
+ destruct (local);
}
};
-
+
InitTasks (tasks, num, workerMethod, options);
-
+
try {
Task.WaitAll (tasks);
} catch {
HandleExceptions (tasks, infos);
}
-
- return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
-
+
+ return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
}
-
- public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource> action)
+
+ public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
{
- return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
- (e, s, l) => { action (e); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ ParallelOptions.Default,
+ () => null,
+ (e, s, l) => { body (e); return null; },
+ _ => {});
}
-
- public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource, ParallelLoopState> action)
+
+ public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
{
- return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
- (e, s, l) => { action (e, s); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ ParallelOptions.Default,
+ () => null,
+ (e, s, l) => { body (e, s); return null; },
+ _ => {});
}
-
- public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable,
- Action<TSource, ParallelLoopState, long> action)
+
+ public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
+ Action<TSource, ParallelLoopState, long> body)
{
- return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
- (e, s, l) => { action (e, s, -1); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ ParallelOptions.Default,
+ () => null,
+ (e, s, l) => { body (e, s, -1); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
Action<TSource, ParallelLoopState> body)
{
- return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e, s); return null; }, null);
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (source,
+ ParallelOptions.Default,
+ () => null,
+ (e, s, l) => { body (e, s); return null; },
+ _ => {});
}
-
- public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
+
+ public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
Action<TSource, ParallelLoopState, long> body)
{
- return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (source,
+ ParallelOptions.Default,
+ () => null,
+ (e, s, i, l) => { body (e, s, i); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
Action<TSource> body)
{
- return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e); return null; }, null);
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (source,
+ ParallelOptions.Default,
+ () => null,
+ (e, s, l) => { body (e); return null; },
+ _ => {});
}
-
- public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
+
+ public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
+ ParallelOptions parallelOptions,
Action<TSource> body)
{
- return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null,
- (e, s, l) => { body (e); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ parallelOptions,
+ () => null,
+ (e, s, l) => { body (e); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
Action<TSource, ParallelLoopState> body)
{
- return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null,
- (e, s, l) => { body (e, s); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ parallelOptions,
+ () => null,
+ (e, s, l) => { body (e, s); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
Action<TSource, ParallelLoopState, long> body)
{
- return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions,
- null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (Partitioner.Create (source),
+ parallelOptions,
+ () => null,
+ (e, s, i, l) => { body (e, s, i); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
Action<TSource, ParallelLoopState, long> body)
{
- return ForEach<TSource, object> (source, parallelOptions, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (source,
+ parallelOptions,
+ () => null,
+ (e, s, i, l) => { body (e, s, i); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
Action<TSource> body)
{
- return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => {body (e); return null; }, null);
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, object> (source,
+ parallelOptions,
+ () => null,
+ (e, s, l) => { body (e); return null; },
+ _ => {});
}
-
- public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
+
+ public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
Action<TSource, ParallelLoopState> body)
{
- return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => { body (e, s); return null; }, null);
+ return ForEach<TSource, object> (source,
+ parallelOptions,
+ () => null,
+ (e, s, l) => { body (e, s); return null; },
+ _ => {});
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
- return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source), null, localInit, body, localFinally);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+
+ return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
+ ParallelOptions.Default,
+ localInit,
+ body,
+ localFinally);
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
- return ForEach<TSource, TLocal> (Partitioner.Create (source), null, localInit, body, localFinally);
+ return ForEach<TSource, TLocal> (Partitioner.Create (source),
+ ParallelOptions.Default,
+ localInit,
+ body,
+ localFinally);
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
+ if (source == null)
+ throw new ArgumentNullException ("source");
+
return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
}
-
+
public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
- Func<TLocal> localInit,
+ Func<TLocal> localInit,
Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
+ if (source == null)
+ throw new ArgumentNullException ("source");
+
return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
}
-
- public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> enumerable, ParallelOptions options,
- Func<TLocal> init,
- Func<TSource, ParallelLoopState, TLocal, TLocal> action,
- Action<TLocal> destruct)
+
+ public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
+ Func<TLocal> localInit,
+ Func<TSource, ParallelLoopState, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
{
- return ForEach<TSource, TLocal> (enumerable.GetPartitions, options, init, action, destruct);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
}
-
- public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> enumerable, ParallelOptions options,
- Func<TLocal> init,
- Func<TSource, ParallelLoopState, long, TLocal, TLocal> action,
- Action<TLocal> destruct)
+
+ public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
+ Func<TLocal> localInit,
+ Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
+ Action<TLocal> localFinally)
{
- return ForEach<KeyValuePair<long, TSource>, TLocal> (enumerable.GetOrderablePartitions, options,
- init, (e, s, l) => action (e.Value, s, e.Key, l), destruct);
+ if (source == null)
+ throw new ArgumentNullException ("source");
+ if (body == null)
+ throw new ArgumentNullException ("body");
+
+ return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
+ parallelOptions,
+ localInit,
+ (e, s, l) => body (e.Value, s, e.Key, l),
+ localFinally);
}
#endregion
{
if (actions == null)
throw new ArgumentNullException ("actions");
-
+
Invoke (actions, (Action a) => Task.Factory.StartNew (a));
}
-
+
public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
{
if (parallelOptions == null)
throw new ArgumentNullException ("parallelOptions");
if (actions == null)
throw new ArgumentNullException ("actions");
-
- Invoke (actions, (Action a) => Task.Factory.StartNew (a, CancellationToken.None, TaskCreationOptions.None, parallelOptions.TaskScheduler));
+
+ Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
}
-
+
static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
{
if (actions.Length == 0)
throw new ArgumentException ("actions is empty");
-
+
// Execute it directly
if (actions.Length == 1 && actions[0] != null)
actions[0] ();
-
+
bool shouldThrow = false;
Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
if (a == null) {
shouldThrow = true;
return null;
}
-
+
return taskCreator (a);
});
-
+
if (shouldThrow)
throw new ArgumentException ("One action in actions is null", "actions");
-
+
try {
Task.WaitAll (ts);
} catch {
{
return SpawnBestNumber (action, -1, callback);
}
-
+
internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
{
return SpawnBestNumber (action, dop, false, callback);
}
-
+
internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
{
// Get the optimum amount of worker to create
int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
-
+
// Initialize worker
CountdownEvent evt = new CountdownEvent (num);
Task[] tasks = new Task [num];
for (int i = 0; i < num; i++) {
- tasks [i] = Task.Factory.StartNew (() => {
+ tasks [i] = Task.Factory.StartNew (() => {
action ();
evt.Signal ();
if (callback != null && evt.IsSet)
});
}
- // If explicitely told, wait for all workers to complete
+ // If explicitely told, wait for all workers to complete
// and thus let main thread participate in the processing
if (wait)
Task.WaitAll (tasks);
-
+
return tasks;
}
- #endregion
+#endregion
}
}
#endif