Fix continuation not being scheduled because of too early and too greedy disposing.
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
index 0a989e8ba1c0a0f830542d08e00f01cf82b44fca..5448dbc3457151b2cb1f0ff4b4d7744a54dd547c 100644 (file)
@@ -1,4 +1,3 @@
-#if NET_4_0
 // Parallel.cs
 //
 // Copyright (c) 2008 Jérémie "Garuma" Laval
@@ -23,6 +22,7 @@
 //
 //
 
+#if NET_4_0
 using System;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
@@ -36,29 +36,32 @@ namespace System.Threading.Tasks
                {
                        return GetBestWorkerNumber (TaskScheduler.Current);
                }
-               
+
                internal static int GetBestWorkerNumber (TaskScheduler scheduler)
-               {       
+               {
                        return scheduler.MaximumConcurrencyLevel;
                }
-               
+
                static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
                {
                        int num = Math.Min (GetBestWorkerNumber (),
                                            options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
                        // Integer range that each task process
-                       step = Math.Min (5, (to - from) / num);
-                       if (step <= 0)
-                               step = 1;
-                       
+                       if ((step = (to - from) / num) < 5) {
+                               step = 5;
+                               num = (to - from) / 5;
+                               if (num < 1)
+                                       num = 1;
+                       }
+
                        return num;
                }
-               
+
                static void HandleExceptions (IEnumerable<Task> tasks)
                {
                        HandleExceptions (tasks, null);
                }
-               
+
                static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
                {
                        List<Exception> exs = new List<Exception> ();
@@ -66,19 +69,19 @@ namespace System.Threading.Tasks
                                if (t.Exception != null)
                                        exs.Add (t.Exception);
                        }
-                       
+
                        if (exs.Count > 0) {
                                if (infos != null)
                                        infos.IsExceptional = true;
-                               
+
                                throw new AggregateException (exs);
                        }
                }
-               
+
                static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
                {
-                       TaskCreationOptions creation = TaskCreationOptions.LongRunning;
-                       
+                       TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
+
                        for (int i = 0; i < count; i++) {
                                if (options == null)
                                        tasks [i] = Task.Factory.StartNew (action, creation);
@@ -86,354 +89,550 @@ namespace System.Threading.Tasks
                                        tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
                        }
                }
-               #region For
-               
-               public static ParallelLoopResult For (int from, int to, Action<int> action)
+
+#region For
+
+               public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
                {
-                       return For (from, to, null, action);
+                       return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
                }
-               
-               public static ParallelLoopResult For (int from, int to, Action<int, ParallelLoopState> action)
+
+               public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
                {
-                       return For (from, to, null, action);
+                       return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
                }
-               
-               public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int> action)
+
+               public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
                {
-                       return For (from, to, options, (index, state) => action (index));
+                       return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
                }
-               
-               public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int, ParallelLoopState> action)
+
+               public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
                {
-                       return For<object> (from, to, options, null, (i, s, l) => { action (i, s); return null; }, null);
+                       return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
                }
-               
-               public static ParallelLoopResult For<TLocal> (int from, int to, Func<TLocal> init,
-                                                             Func<int, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
+
+               public static ParallelLoopResult For<TLocal> (int fromInclusive,
+                                                             int toExclusive,
+                                                             Func<TLocal> localInit,
+                                                             Func<int, ParallelLoopState, TLocal, TLocal> body,
+                                                             Action<TLocal> localFinally)
                {
-                       return For<TLocal> (from, to, null, init, action, destruct);
+                       return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
                }
-       
-               
-               public static ParallelLoopResult For<TLocal> (int from, int to, ParallelOptions options, 
-                                                             Func<TLocal> init, 
-                                                             Func<int, ParallelLoopState, TLocal, TLocal> action,
-                                                             Action<TLocal> destruct)
-               {                       
-                       if (action == null)
-                               throw new ArgumentNullException ("action");
-                       
-                       // Number of task to be launched (normally == Env.ProcessorCount)
+
+               public static ParallelLoopResult For<TLocal> (int fromInclusive,
+                                                             int toExclusive,
+                                                             ParallelOptions parallelOptions,
+                                                             Func<TLocal> localInit,
+                                                             Func<int, ParallelLoopState, TLocal, TLocal> body,
+                                                             Action<TLocal> localFinally)
+               {
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+                       if (localInit == null)
+                               throw new ArgumentNullException ("localInit");
+                       if (localFinally == null)
+                               throw new ArgumentNullException ("localFinally");
+                       if (parallelOptions == null)
+                               throw new ArgumentNullException ("options");
+                       if (fromInclusive >= toExclusive)
+                               return new ParallelLoopResult (null, true);
+
+                       // Number of task toExclusive be launched (normally == Env.ProcessorCount)
                        int step;
-                       int num = GetBestWorkerNumber (from, to, options, out step);
+                       int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
 
-                       // Each worker put the indexes it's responsible for here
-                       // so that other worker may steal if they starve.
-                       SimpleConcurrentBag<int> bag = new SimpleConcurrentBag<int> (num);
                        Task[] tasks = new Task [num];
+
+                       StealRange[] ranges = new StealRange[num];
+                       for (int i = 0; i < num; i++)
+                               ranges[i] = new StealRange (fromInclusive, i, step);
+
                        ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
-                       
-                       Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
-                               if (options != null && options.CancellationToken.IsCancellationRequested) {
-                                       s.Stop ();
-                                       return true;
-                               }
-                               return false;
-                       };
-                       
-                       Func<int, bool> breakTest = (i) => infos.LowestBreakIteration != null && infos.LowestBreakIteration > i;
-                       
-                       int currentIndex = from;
-                       
+
+                       int currentIndex = -1;
+
                        Action workerMethod = delegate {
-                               int index, actual;
-                               TLocal local = (init == null) ? default (TLocal) : init ();
-                               
+                               int localWorker = Interlocked.Increment (ref currentIndex);
+                               StealRange range = ranges[localWorker];
+                               int index = range.Actual;
+                               int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
+                               TLocal local = localInit ();
+
                                ParallelLoopState state = new ParallelLoopState (infos);
-                               int workIndex = bag.GetNextIndex ();
-                               
+                               CancellationToken token = parallelOptions.CancellationToken;
+
                                try {
-                                       while (currentIndex < to && (index = Interlocked.Add (ref currentIndex, step) - step) < to) {
-                                               if (infos.IsStopped.Value)
+                                       for (int i = index; i < stopIndex; range.Actual = ++i) {
+                                               if (infos.IsStopped)
                                                        return;
-                                               
-                                               if (cancellationTokenTest (state))
+
+                                               token.ThrowIfCancellationRequested ();
+
+                                               if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
                                                        return;
-                                               
-                                               for (int i = index; i < to && i < index + step; i++)
-                                                       bag.Add (workIndex, i);
-                                               
-                                               for (int i = index; i < to && i < index + step && bag.TryTake (workIndex, out actual); i++) {
-                                                       if (infos.IsStopped.Value)
-                                                               return;
-                                                       
-                                                       if (cancellationTokenTest (state))
-                                                               return;
-                                                       
-                                                       if (breakTest (actual))
-                                                               return;
-                                                       
-                                                       state.CurrentIteration = actual;
-                                                       local = action (actual, state, local);
-                                               }
+
+                                               state.CurrentIteration = i;
+                                               local = body (i, state, local);
+                                               if (i >= stopIndex - range.Stolen)
+                                                       break;
                                        }
-                                       
-                                       while (bag.TrySteal (workIndex, out actual)) {
-                                               if (infos.IsStopped.Value)
-                                                       return;
-                                               
-                                               if (cancellationTokenTest (state))
-                                                       return;
-                                               
-                                               if (breakTest (actual))
-                                                       continue;
-                                               
-                                               state.CurrentIteration = actual;
-                                               local = action (actual, state, local);
+
+                                       // Try toExclusive steal fromInclusive our right neighbor (cyclic)
+                                       int len = num + localWorker;
+                                       for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
+                                               int extWorker = sIndex % num;
+                                               range = ranges[extWorker];
+
+                                               stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
+
+                                               int stolen;
+                                               do {
+                                                       stolen = range.Stolen;
+                                                       if (stopIndex - stolen > range.Actual)
+                                                               goto next;
+                                               } while (Interlocked.CompareExchange (ref range.Stolen, stolen + 1, stolen) != stolen);
+
+                                               stolen = stopIndex - stolen - 1;
+
+                                               if (stolen > range.Actual)
+                                                       local = body (stolen, state, local);
+
+                                       next:
+                                               continue;
                                        }
                                } finally {
-                                       if (destruct != null)
-                                               destruct (local);
+                                       localFinally (local);
                                }
                        };
 
-                       InitTasks (tasks, num, workerMethod, options);
-                       
+                       InitTasks (tasks, num, workerMethod, parallelOptions);
+
                        try {
                                Task.WaitAll (tasks);
                        } catch {
                                HandleExceptions (tasks, infos);
                        }
-                       
-                       return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));    
+
+                       return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
                }
 
-               #endregion
-               
-               #region Foreach
+               class StealRange
+               {
+                       public int Stolen;
+                       public int Actual;
+
+                       public StealRange (int fromInclusive, int i, int step)
+                       {
+                               Actual = fromInclusive + i * step;
+                       }
+               }
+
+#endregion
+
+#region For (long)
+
+               [MonoTODO]
+               public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
+               {
+                       return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
+               }
+
+               [MonoTODO]
+               public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
+               {
+                       return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
+               }
+
+               [MonoTODO]
+               public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
+               {
+                       return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
+               }
+
+               [MonoTODO]
+               public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
+               {
+                       return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
+               }
+
+               [MonoTODO]
+               public static ParallelLoopResult For<TLocal> (long fromInclusive,
+                                                             long toExclusive,
+                                                             Func<TLocal> localInit,
+                                                             Func<long, ParallelLoopState, TLocal, TLocal> body,
+                                                             Action<TLocal> localFinally)
+               {
+                       return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
+               }
+
+               [MonoTODO ("See how this can be refactored with the above For implementation")]
+               public static ParallelLoopResult For<TLocal> (long fromInclusive,
+                                                             long toExclusive,
+                                                             ParallelOptions parallelOptions,
+                                                             Func<TLocal> localInit,
+                                                             Func<long, ParallelLoopState, TLocal, TLocal> body,
+                                                             Action<TLocal> localFinally)
+               {
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+                       if (localInit == null)
+                               throw new ArgumentNullException ("localInit");
+                       if (localFinally == null)
+                               throw new ArgumentNullException ("localFinally");
+                       if (parallelOptions == null)
+                               throw new ArgumentNullException ("options");
+                       if (fromInclusive >= toExclusive)
+                               return new ParallelLoopResult (null, true);
+
+                       throw new NotImplementedException ();
+               }
+
+#endregion
+
+#region Foreach
                static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
                                                                    Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
                                                                    Action<TLocal> destruct)
-               {               
+               {
+                       if (enumerable == null)
+                               throw new ArgumentNullException ("source");
+                       if (options == null)
+                               throw new ArgumentNullException ("options");
+                       if (action == null)
+                               throw new ArgumentNullException ("action");
+                       if (init == null)
+                               throw new ArgumentNullException ("init");
+                       if (destruct == null)
+                               throw new ArgumentNullException ("destruct");
+
                        int num = Math.Min (GetBestWorkerNumber (),
                                            options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
-                       
+
                        Task[] tasks = new Task[num];
                        ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
-                       
+
                        SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
                        const int bagCount = 5;
-                       
+
                        IList<IEnumerator<TSource>> slices = enumerable (num);
-                       
-                       int sliceIndex = 0;
-                       
-                       Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
-                               if (options != null && options.CancellationToken.IsCancellationRequested) {
-                                       s.Stop ();
-                                       return true;
-                               }
-                               return false;
-                       };
+
+                       int sliceIndex = -1;
 
                        Action workerMethod = delegate {
-                               IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex) - 1];
-                               
-                               TLocal local = (init != null) ? init () : default (TLocal);
+                               IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
+
+                               TLocal local = init ();
                                ParallelLoopState state = new ParallelLoopState (infos);
                                int workIndex = bag.GetNextIndex ();
-                               
+                               CancellationToken token = options.CancellationToken;
+
                                try {
                                        bool cont = true;
                                        TSource element;
-                                       
+
                                        while (cont) {
-                                               if (infos.IsStopped.Value)
-                                                       return;
-                                               
-                                               if (cancellationTokenTest (state))
+                                               if (infos.IsStopped || infos.IsBroken.Value)
                                                        return;
-                                               
+
+                                               token.ThrowIfCancellationRequested ();
+
                                                for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
                                                        bag.Add (workIndex, slice.Current);
                                                }
-                                               
+
                                                for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
-                                                       if (infos.IsStopped.Value)
+                                                       if (infos.IsStopped)
                                                                return;
-                                                       
-                                                       if (cancellationTokenTest (state))
-                                                               return;
-                                                       
+
+                                                       token.ThrowIfCancellationRequested ();
+
                                                        local = action (element, state, local);
                                                }
                                        }
-                                       
+
                                        while (bag.TrySteal (workIndex, out element)) {
-                                               if (infos.IsStopped.Value)
-                                                       return;
-                                               
-                                               if (cancellationTokenTest (state))
-                                                       return;
-                                               
+                                               token.ThrowIfCancellationRequested ();
+
                                                local = action (element, state, local);
+
+                                               if (infos.IsStopped || infos.IsBroken.Value)
+                                                       return;
                                        }
                                } finally {
-                                       if (destruct != null)
-                                               destruct (local);
+                                       destruct (local);
                                }
                        };
-                       
+
                        InitTasks (tasks, num, workerMethod, options);
-                       
+
                        try {
                                Task.WaitAll (tasks);
                        } catch {
                                HandleExceptions (tasks, infos);
                        }
-                       
-                       return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
-                       
+
+                       return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource> action)
+
+               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null, 
-                                                        (e, s, l) => { action (e); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, l) => { body (e); return null; },
+                                                        _ => {});
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource, ParallelLoopState> action)
+
+               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
-                                                        (e, s, l) => { action (e, s); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, l) => { body (e, s); return null; },
+                                                        _ => {});
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable,
-                                                                  Action<TSource, ParallelLoopState, long> action)
+
+               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
+                                                                  Action<TSource, ParallelLoopState, long> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
-                                                        (e, s, l) => { action (e, s, -1); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, l) => { body (e, s, -1); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
                                                                   Action<TSource, ParallelLoopState> body)
                {
-                       return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e, s); return null; }, null);
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (source,
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, l) => { body (e, s); return null; },
+                                                        _ => {});
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, 
+
+               public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
                                                                   Action<TSource, ParallelLoopState, long> body)
 
                {
-                       return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (source,
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, i, l) => { body (e, s, i); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
                                                                   Action<TSource> body)
 
                {
-                       return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e); return null; }, null);
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (source,
+                                                        ParallelOptions.Default,
+                                                        () => null,
+                                                        (e, s, l) => { body (e); return null; },
+                                                        _ => {});
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
+
+               public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
+                                                                  ParallelOptions parallelOptions,
                                                                   Action<TSource> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null,
-                                                        (e, s, l) => { body (e); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, l) => { body (e); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
                                                                   Action<TSource, ParallelLoopState> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null, 
-                                                        (e, s, l) => { body (e, s); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, l) => { body (e, s); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
                                                                   Action<TSource, ParallelLoopState, long> body)
                {
-                       return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions,
-                                                        null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (Partitioner.Create (source),
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, i, l) => { body (e, s, i); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
                                                                   Action<TSource, ParallelLoopState, long> body)
 
                {
-                       return ForEach<TSource, object> (source, parallelOptions, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (source,
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, i, l) => { body (e, s, i); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
                                                                   Action<TSource> body)
                {
-                       return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => {body (e); return null; }, null);
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, object> (source,
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, l) => { body (e); return null; },
+                                                        _ => {});
                }
-               
-               public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions, 
+
+               public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
                                                                   Action<TSource, ParallelLoopState> body)
                {
-                       return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => { body (e, s); return null; }, null);
+                       return ForEach<TSource, object> (source,
+                                                        parallelOptions,
+                                                        () => null,
+                                                        (e, s, l) => { body (e, s); return null; },
+                                                        _ => {});
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
-                       return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source), null, localInit, body, localFinally);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+
+                       return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
+                                                        ParallelOptions.Default,
+                                                        localInit,
+                                                        body,
+                                                        localFinally);
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
-                       return ForEach<TSource, TLocal> (Partitioner.Create (source), null, localInit, body, localFinally);
+                       return ForEach<TSource, TLocal> (Partitioner.Create (source),
+                                                        ParallelOptions.Default,
+                                                        localInit,
+                                                        body,
+                                                        localFinally);
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
                        return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
                        return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
                                                                           Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+
                        return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
                }
-               
+
                public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
-                                                                          Func<TLocal> localInit, 
+                                                                          Func<TLocal> localInit,
                                                                           Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
                                                                           Action<TLocal> localFinally)
                {
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+
                        return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
                }
-               
-               public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> enumerable, ParallelOptions options,
-                                                                          Func<TLocal> init,
-                                                                          Func<TSource, ParallelLoopState, TLocal, TLocal> action,
-                                                                          Action<TLocal> destruct)
+
+               public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
+                                                                          Func<TLocal> localInit,
+                                                                          Func<TSource, ParallelLoopState, TLocal, TLocal> body,
+                                                                          Action<TLocal> localFinally)
                {
-                       return ForEach<TSource, TLocal> (enumerable.GetPartitions, options, init, action, destruct);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
                }
-                       
-               public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> enumerable, ParallelOptions options,
-                                                                          Func<TLocal> init,
-                                                                          Func<TSource, ParallelLoopState, long, TLocal, TLocal> action,
-                                                                          Action<TLocal> destruct)
+
+               public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
+                                                                          Func<TLocal> localInit,
+                                                                          Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
+                                                                          Action<TLocal> localFinally)
                {
-                       return ForEach<KeyValuePair<long, TSource>, TLocal> (enumerable.GetOrderablePartitions, options,
-                                                                           init, (e, s, l) => action (e.Value, s, e.Key, l), destruct);
+                       if (source == null)
+                               throw new ArgumentNullException ("source");
+                       if (body == null)
+                               throw new ArgumentNullException ("body");
+
+                       return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
+                                                                            parallelOptions,
+                                                                            localInit,
+                                                                            (e, s, l) => body (e.Value, s, e.Key, l),
+                                                                            localFinally);
                }
                #endregion
 
@@ -442,42 +641,42 @@ namespace System.Threading.Tasks
                {
                        if (actions == null)
                                throw new ArgumentNullException ("actions");
-                       
+
                        Invoke (actions, (Action a) => Task.Factory.StartNew (a));
                }
-               
+
                public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
                {
                        if (parallelOptions == null)
                                throw new ArgumentNullException ("parallelOptions");
                        if (actions == null)
                                throw new ArgumentNullException ("actions");
-                       
-                       Invoke (actions, (Action a) => Task.Factory.StartNew (a, CancellationToken.None, TaskCreationOptions.None, parallelOptions.TaskScheduler));
+
+                       Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
                }
-               
+
                static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
                {
                        if (actions.Length == 0)
                                throw new ArgumentException ("actions is empty");
-                       
+
                        // Execute it directly
                        if (actions.Length == 1 && actions[0] != null)
                                actions[0] ();
-                       
+
                        bool shouldThrow = false;
                        Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
                                if (a == null) {
                                        shouldThrow = true;
                                        return null;
                                }
-                               
+
                                return taskCreator (a);
                        });
-                       
+
                        if (shouldThrow)
                                throw new ArgumentException ("One action in actions is null", "actions");
-                       
+
                        try {
                                Task.WaitAll (ts);
                        } catch {
@@ -491,22 +690,22 @@ namespace System.Threading.Tasks
                {
                        return SpawnBestNumber (action, -1, callback);
                }
-               
+
                internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
                {
                        return SpawnBestNumber (action, dop, false, callback);
                }
-               
+
                internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
                {
                        // Get the optimum amount of worker to create
                        int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
-                       
+
                        // Initialize worker
                        CountdownEvent evt = new CountdownEvent (num);
                        Task[] tasks = new Task [num];
                        for (int i = 0; i < num; i++) {
-                               tasks [i] = Task.Factory.StartNew (() => { 
+                               tasks [i] = Task.Factory.StartNew (() => {
                                        action ();
                                        evt.Signal ();
                                        if (callback != null && evt.IsSet)
@@ -514,14 +713,14 @@ namespace System.Threading.Tasks
                                });
                        }
 
-                       // If explicitely told, wait for all workers to complete 
+                       // If explicitely told, wait for all workers to complete
                        // and thus let main thread participate in the processing
                        if (wait)
                                Task.WaitAll (tasks);
-                       
+
                        return tasks;
                }
-               #endregion
+#endregion
        }
 }
 #endif