Merge pull request #487 from mayerwin/patch-1
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 // Parallel.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0 || MOBILE
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30 using System.Runtime.InteropServices;
31
32 namespace System.Threading.Tasks
33 {
34         public static class Parallel
35         {
36 #if MOONLIGHT || MOBILE
37                 static readonly bool sixtyfour = IntPtr.Size == 8;
38 #else
39                 static readonly bool sixtyfour = Environment.Is64BitProcess;
40 #endif
41
42                 internal static int GetBestWorkerNumber ()
43                 {
44                         return GetBestWorkerNumber (TaskScheduler.Current);
45                 }
46
47                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
48                 {
49                         return Math.Min (Environment.ProcessorCount, (scheduler ?? TaskScheduler.Current).MaximumConcurrencyLevel);
50                 }
51
52                 static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
53                 {
54                         int num = GetBestWorkerNumber(options.TaskScheduler);
55                         if (options != null && options.MaxDegreeOfParallelism != -1)
56                                 num = Math.Min (options.MaxDegreeOfParallelism, num);
57                         // Integer range that each task process
58                         if ((step = (to - from) / num) < 5) {
59                                 step = 5;
60                                 num = (to - from) / 5;
61                                 if (num < 1)
62                                         num = 1;
63                         }
64
65                         return num;
66                 }
67
68                 static void HandleExceptions (IEnumerable<Task> tasks)
69                 {
70                         HandleExceptions (tasks, null);
71                 }
72
73                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
74                 {
75                         List<Exception> exs = new List<Exception> ();
76                         foreach (Task t in tasks) {
77                                 if (t.Exception != null)
78                                         exs.Add (t.Exception);
79                         }
80
81                         if (exs.Count > 0) {
82                                 if (infos != null)
83                                         infos.IsExceptional = true;
84
85                                 throw new AggregateException (exs).Flatten ();
86                         }
87                 }
88
89                 static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
90                 {
91                         TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
92
93                         for (int i = 0; i < count; i++) {
94                                 if (options == null)
95                                         tasks [i] = Task.Factory.StartNew (action, creation);
96                                 else
97                                         tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
98                         }
99                 }
100
101 #region For
102
103                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
104                 {
105                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
106                 }
107
108                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
109                 {
110                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
111                 }
112
113                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
114                 {
115                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
116                 }
117
118                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
119                 {
120                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
121                 }
122
123                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
124                                                               int toExclusive,
125                                                               Func<TLocal> localInit,
126                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
127                                                               Action<TLocal> localFinally)
128                 {
129                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
130                 }
131
132                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
133                                                               int toExclusive,
134                                                               ParallelOptions parallelOptions,
135                                                               Func<TLocal> localInit,
136                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
137                                                               Action<TLocal> localFinally)
138                 {
139                         if (body == null)
140                                 throw new ArgumentNullException ("body");
141                         if (localInit == null)
142                                 throw new ArgumentNullException ("localInit");
143                         if (localFinally == null)
144                                 throw new ArgumentNullException ("localFinally");
145                         if (parallelOptions == null)
146                                 throw new ArgumentNullException ("options");
147                         if (fromInclusive >= toExclusive)
148                                 return new ParallelLoopResult (null, true);
149
150                         // Number of task toExclusive be launched (normally == Env.ProcessorCount)
151                         int step;
152                         int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
153
154                         Task[] tasks = new Task [num];
155
156                         StealRange[] ranges = new StealRange[num];
157                         for (int i = 0; i < num; i++)
158                                 ranges[i] = new StealRange (fromInclusive, i, step);
159
160                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
161
162                         int currentIndex = -1;
163
164                         Action workerMethod = delegate {
165                                 int localWorker = Interlocked.Increment (ref currentIndex);
166                                 StealRange range = ranges[localWorker];
167                                 int index = range.V64.Actual;
168                                 int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
169                                 TLocal local = localInit ();
170
171                                 ParallelLoopState state = new ParallelLoopState (infos);
172                                 CancellationToken token = parallelOptions.CancellationToken;
173
174                                 try {
175                                         for (int i = index; i < stopIndex;) {
176                                                 if (infos.IsStopped)
177                                                         return;
178
179                                                 token.ThrowIfCancellationRequested ();
180
181                                                 if (i >= stopIndex - range.V64.Stolen)
182                                                         break;
183
184                                                 if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
185                                                         return;
186
187                                                 state.CurrentIteration = i;
188                                                 local = body (i, state, local);
189
190                                                 if (i + 1 >= stopIndex - range.V64.Stolen)
191                                                         break;
192
193                                                 range.V64.Actual = ++i;
194                                         }
195
196                                         // Try toExclusive steal fromInclusive our right neighbor (cyclic)
197                                         int len = num + localWorker;
198                                         for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
199                                                 int extWorker = sIndex % num;
200                                                 range = ranges[extWorker];
201
202                                                 stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
203                                                 int stolen = -1;
204
205                                                 do {
206                                                         do {
207                                                                 long old;
208                                                                 StealValue64 val = new StealValue64 ();
209
210                                                                 old = sixtyfour ? range.V64.Value : Interlocked.CompareExchange (ref range.V64.Value, 0, 0);
211                                                                 val.Value = old;
212
213                                                                 if (val.Actual >= stopIndex - val.Stolen - 2)
214                                                                         goto next;
215                                                                 stolen = (val.Stolen += 1);
216
217                                                                 if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
218                                                                         break;
219                                                         } while (true);
220
221                                                         stolen = stopIndex - stolen;
222
223                                                         if (stolen > range.V64.Actual)
224                                                                 local = body (stolen, state, local);
225                                                         else
226                                                                 break;
227                                                 } while (true);
228
229                                                 next:
230                                                 continue;
231                                         }
232                                 } finally {
233                                         localFinally (local);
234                                 }
235                         };
236
237                         InitTasks (tasks, num, workerMethod, parallelOptions);
238
239                         try {
240                                 Task.WaitAll (tasks);
241                         } catch {
242                                 HandleExceptions (tasks, infos);
243                         }
244
245                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
246                 }
247
248                 [StructLayout(LayoutKind.Explicit)]
249                 struct StealValue64 {
250                         [FieldOffset(0)]
251                         public long Value;
252                         [FieldOffset(0)]
253                         public int Actual;
254                         [FieldOffset(4)]
255                         public int Stolen;
256                 }
257
258                 class StealRange
259                 {
260                         public StealValue64 V64 = new StealValue64 ();
261
262                         public StealRange (int fromInclusive, int i, int step)
263                         {
264                                 V64.Actual = fromInclusive + i * step;
265                         }
266                 }
267
268 #endregion
269
270 #region For (long)
271
272                 [MonoTODO]
273                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
274                 {
275                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
276                 }
277
278                 [MonoTODO]
279                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
280                 {
281                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
282                 }
283
284                 [MonoTODO]
285                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
286                 {
287                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
288                 }
289
290                 [MonoTODO]
291                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
292                 {
293                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
294                 }
295
296                 [MonoTODO]
297                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
298                                                               long toExclusive,
299                                                               Func<TLocal> localInit,
300                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
301                                                               Action<TLocal> localFinally)
302                 {
303                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
304                 }
305
306                 [MonoTODO ("See how this can be refactored with the above For implementation")]
307                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
308                                                               long toExclusive,
309                                                               ParallelOptions parallelOptions,
310                                                               Func<TLocal> localInit,
311                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
312                                                               Action<TLocal> localFinally)
313                 {
314                         if (body == null)
315                                 throw new ArgumentNullException ("body");
316                         if (localInit == null)
317                                 throw new ArgumentNullException ("localInit");
318                         if (localFinally == null)
319                                 throw new ArgumentNullException ("localFinally");
320                         if (parallelOptions == null)
321                                 throw new ArgumentNullException ("options");
322                         if (fromInclusive >= toExclusive)
323                                 return new ParallelLoopResult (null, true);
324
325                         throw new NotImplementedException ();
326                 }
327
328 #endregion
329
330 #region Foreach
331                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
332                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
333                                                                     Action<TLocal> destruct)
334                 {
335                         if (enumerable == null)
336                                 throw new ArgumentNullException ("source");
337                         if (options == null)
338                                 throw new ArgumentNullException ("options");
339                         if (action == null)
340                                 throw new ArgumentNullException ("action");
341                         if (init == null)
342                                 throw new ArgumentNullException ("init");
343                         if (destruct == null)
344                                 throw new ArgumentNullException ("destruct");
345
346                         int num = Math.Min (GetBestWorkerNumber (options.TaskScheduler),
347                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
348
349                         Task[] tasks = new Task[num];
350                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
351
352                         SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
353                         const int bagCount = 5;
354
355                         IList<IEnumerator<TSource>> slices = enumerable (num);
356
357                         int sliceIndex = -1;
358
359                         Action workerMethod = delegate {
360                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
361
362                                 TLocal local = init ();
363                                 ParallelLoopState state = new ParallelLoopState (infos);
364                                 int workIndex = bag.GetNextIndex ();
365                                 CancellationToken token = options.CancellationToken;
366
367                                 try {
368                                         bool cont = true;
369                                         TSource element;
370
371                                         while (cont) {
372                                                 if (infos.IsStopped || infos.IsBroken.Value)
373                                                         return;
374
375                                                 token.ThrowIfCancellationRequested ();
376
377                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
378                                                         bag.Add (workIndex, slice.Current);
379                                                 }
380
381                                                 for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
382                                                         if (infos.IsStopped)
383                                                                 return;
384
385                                                         token.ThrowIfCancellationRequested ();
386
387                                                         local = action (element, state, local);
388                                                 }
389                                         }
390
391                                         while (bag.TrySteal (workIndex, out element)) {
392                                                 token.ThrowIfCancellationRequested ();
393
394                                                 local = action (element, state, local);
395
396                                                 if (infos.IsStopped || infos.IsBroken.Value)
397                                                         return;
398                                         }
399                                 } finally {
400                                         destruct (local);
401                                 }
402                         };
403
404                         InitTasks (tasks, num, workerMethod, options);
405
406                         try {
407                                 Task.WaitAll (tasks);
408                         } catch {
409                                 HandleExceptions (tasks, infos);
410                         }
411
412                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
413                 }
414
415                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
416                 {
417                         if (source == null)
418                                 throw new ArgumentNullException ("source");
419                         if (body == null)
420                                 throw new ArgumentNullException ("body");
421
422                         return ForEach<TSource, object> (Partitioner.Create (source),
423                                                          ParallelOptions.Default,
424                                                          () => null,
425                                                          (e, s, l) => { body (e); return null; },
426                                                          _ => {});
427                 }
428
429                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
430                 {
431                         if (source == null)
432                                 throw new ArgumentNullException ("source");
433                         if (body == null)
434                                 throw new ArgumentNullException ("body");
435
436                         return ForEach<TSource, object> (Partitioner.Create (source),
437                                                          ParallelOptions.Default,
438                                                          () => null,
439                                                          (e, s, l) => { body (e, s); return null; },
440                                                          _ => {});
441                 }
442
443                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
444                                                                    Action<TSource, ParallelLoopState, long> body)
445                 {
446                         if (source == null)
447                                 throw new ArgumentNullException ("source");
448                         if (body == null)
449                                 throw new ArgumentNullException ("body");
450
451
452                         return ForEach<TSource, object> (Partitioner.Create (source),
453                                                          ParallelOptions.Default,
454                                                          () => null,
455                                                          (e, s, l) => { body (e, s, -1); return null; },
456                                                          _ => {});
457                 }
458
459                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
460                                                                    Action<TSource, ParallelLoopState> body)
461                 {
462                         if (body == null)
463                                 throw new ArgumentNullException ("body");
464
465                         return ForEach<TSource, object> (source,
466                                                          ParallelOptions.Default,
467                                                          () => null,
468                                                          (e, s, l) => { body (e, s); return null; },
469                                                          _ => {});
470                 }
471
472                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
473                                                                    Action<TSource, ParallelLoopState, long> body)
474
475                 {
476                         if (body == null)
477                                 throw new ArgumentNullException ("body");
478
479                         return ForEach<TSource, object> (source,
480                                                          ParallelOptions.Default,
481                                                          () => null,
482                                                          (e, s, i, l) => { body (e, s, i); return null; },
483                                                          _ => {});
484                 }
485
486                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
487                                                                    Action<TSource> body)
488
489                 {
490                         if (body == null)
491                                 throw new ArgumentNullException ("body");
492
493                         return ForEach<TSource, object> (source,
494                                                          ParallelOptions.Default,
495                                                          () => null,
496                                                          (e, s, l) => { body (e); return null; },
497                                                          _ => {});
498                 }
499
500                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
501                                                                    ParallelOptions parallelOptions,
502                                                                    Action<TSource> body)
503                 {
504                         if (source == null)
505                                 throw new ArgumentNullException ("source");
506                         if (body == null)
507                                 throw new ArgumentNullException ("body");
508
509                         return ForEach<TSource, object> (Partitioner.Create (source),
510                                                          parallelOptions,
511                                                          () => null,
512                                                          (e, s, l) => { body (e); return null; },
513                                                          _ => {});
514                 }
515
516                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
517                                                                    Action<TSource, ParallelLoopState> body)
518                 {
519                         if (source == null)
520                                 throw new ArgumentNullException ("source");
521                         if (body == null)
522                                 throw new ArgumentNullException ("body");
523
524                         return ForEach<TSource, object> (Partitioner.Create (source),
525                                                          parallelOptions,
526                                                          () => null,
527                                                          (e, s, l) => { body (e, s); return null; },
528                                                          _ => {});
529                 }
530
531                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
532                                                                    Action<TSource, ParallelLoopState, long> body)
533                 {
534                         if (source == null)
535                                 throw new ArgumentNullException ("source");
536                         if (body == null)
537                                 throw new ArgumentNullException ("body");
538
539                         return ForEach<TSource, object> (Partitioner.Create (source),
540                                                          parallelOptions,
541                                                          () => null,
542                                                          (e, s, i, l) => { body (e, s, i); return null; },
543                                                          _ => {});
544                 }
545
546                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
547                                                                    Action<TSource, ParallelLoopState, long> body)
548
549                 {
550                         if (body == null)
551                                 throw new ArgumentNullException ("body");
552
553                         return ForEach<TSource, object> (source,
554                                                          parallelOptions,
555                                                          () => null,
556                                                          (e, s, i, l) => { body (e, s, i); return null; },
557                                                          _ => {});
558                 }
559
560                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
561                                                                    Action<TSource> body)
562                 {
563                         if (body == null)
564                                 throw new ArgumentNullException ("body");
565
566                         return ForEach<TSource, object> (source,
567                                                          parallelOptions,
568                                                          () => null,
569                                                          (e, s, l) => { body (e); return null; },
570                                                          _ => {});
571                 }
572
573                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
574                                                                    Action<TSource, ParallelLoopState> body)
575                 {
576                         return ForEach<TSource, object> (source,
577                                                          parallelOptions,
578                                                          () => null,
579                                                          (e, s, l) => { body (e, s); return null; },
580                                                          _ => {});
581                 }
582
583                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
584                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
585                                                                            Action<TLocal> localFinally)
586                 {
587                         if (source == null)
588                                 throw new ArgumentNullException ("source");
589
590                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
591                                                          ParallelOptions.Default,
592                                                          localInit,
593                                                          body,
594                                                          localFinally);
595                 }
596
597                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
598                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
599                                                                            Action<TLocal> localFinally)
600                 {
601                         return ForEach<TSource, TLocal> (Partitioner.Create (source),
602                                                          ParallelOptions.Default,
603                                                          localInit,
604                                                          body,
605                                                          localFinally);
606                 }
607
608                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
609                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
610                                                                            Action<TLocal> localFinally)
611                 {
612                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
613                 }
614
615                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
616                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
617                                                                            Action<TLocal> localFinally)
618                 {
619                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
620                 }
621
622                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
623                                                                            Func<TLocal> localInit,
624                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
625                                                                            Action<TLocal> localFinally)
626                 {
627                         if (source == null)
628                                 throw new ArgumentNullException ("source");
629
630                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
631                 }
632
633                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
634                                                                            Func<TLocal> localInit,
635                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
636                                                                            Action<TLocal> localFinally)
637                 {
638                         if (source == null)
639                                 throw new ArgumentNullException ("source");
640
641                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
642                 }
643
644                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
645                                                                            Func<TLocal> localInit,
646                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
647                                                                            Action<TLocal> localFinally)
648                 {
649                         if (source == null)
650                                 throw new ArgumentNullException ("source");
651                         if (body == null)
652                                 throw new ArgumentNullException ("body");
653
654                         return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
655                 }
656
657                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
658                                                                            Func<TLocal> localInit,
659                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
660                                                                            Action<TLocal> localFinally)
661                 {
662                         if (source == null)
663                                 throw new ArgumentNullException ("source");
664                         if (body == null)
665                                 throw new ArgumentNullException ("body");
666
667                         return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
668                                                                              parallelOptions,
669                                                                              localInit,
670                                                                              (e, s, l) => body (e.Value, s, e.Key, l),
671                                                                              localFinally);
672                 }
673                 #endregion
674
675                 #region Invoke
676                 public static void Invoke (params Action[] actions)
677                 {
678                         if (actions == null)
679                                 throw new ArgumentNullException ("actions");
680
681                         Invoke (ParallelOptions.Default, actions);
682                 }
683
684                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
685                 {
686                         if (parallelOptions == null)
687                                 throw new ArgumentNullException ("parallelOptions");
688                         if (actions == null)
689                                 throw new ArgumentNullException ("actions");
690                         if (actions.Length == 0)
691                                 throw new ArgumentException ("actions is empty");
692                         foreach (var a in actions)
693                                 if (a == null)
694                                         throw new ArgumentException ("One action in actions is null", "actions");
695                         if (actions.Length == 1) {
696                                 actions[0] ();
697                                 return;
698                         }
699
700                         Task[] ts = new Task[actions.Length];
701                         for (int i = 0; i < ts.Length; i++)
702                                 ts[i] = Task.Factory.StartNew (actions[i],
703                                                                parallelOptions.CancellationToken,
704                                                                TaskCreationOptions.None,
705                                                                parallelOptions.TaskScheduler);
706
707                         try {
708                                 Task.WaitAll (ts, parallelOptions.CancellationToken);
709                         } catch {
710                                 HandleExceptions (ts);
711                         }
712                 }
713                 #endregion
714
715                 #region SpawnBestNumber, used by PLinq
716                 internal static Task[] SpawnBestNumber (Action action, Action callback)
717                 {
718                         return SpawnBestNumber (action, -1, callback);
719                 }
720
721                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
722                 {
723                         return SpawnBestNumber (action, dop, false, callback);
724                 }
725
726                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
727                 {
728                         // Get the optimum amount of worker to create
729                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
730
731                         // Initialize worker
732                         CountdownEvent evt = new CountdownEvent (num);
733                         Task[] tasks = new Task [num];
734                         for (int i = 0; i < num; i++) {
735                                 tasks [i] = Task.Factory.StartNew (() => {
736                                         action ();
737                                         evt.Signal ();
738                                         if (callback != null && evt.IsSet)
739                                                 callback ();
740                                 });
741                         }
742
743                         // If explicitely told, wait for all workers to complete
744                         // and thus let main thread participate in the processing
745                         if (wait)
746                                 Task.WaitAll (tasks);
747
748                         return tasks;
749                 }
750 #endregion
751         }
752 }
753 #endif