Merge pull request #121 from LogosBible/processfixes
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 // Parallel.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0 || MOBILE
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30 using System.Runtime.InteropServices;
31
32 namespace System.Threading.Tasks
33 {
34         public static class Parallel
35         {
36 #if MOONLIGHT || MOBILE
37                 static readonly bool sixtyfour = IntPtr.Size == 8;
38 #else
39                 static readonly bool sixtyfour = Environment.Is64BitProcess;
40 #endif
41
42                 internal static int GetBestWorkerNumber ()
43                 {
44                         return GetBestWorkerNumber (TaskScheduler.Current);
45                 }
46
47                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
48                 {
49                         return scheduler.MaximumConcurrencyLevel;
50                 }
51
52                 static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
53                 {
54                         int num = Math.Min (GetBestWorkerNumber (options.TaskScheduler),
55                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
56                         // Integer range that each task process
57                         if ((step = (to - from) / num) < 5) {
58                                 step = 5;
59                                 num = (to - from) / 5;
60                                 if (num < 1)
61                                         num = 1;
62                         }
63
64                         return num;
65                 }
66
67                 static void HandleExceptions (IEnumerable<Task> tasks)
68                 {
69                         HandleExceptions (tasks, null);
70                 }
71
72                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
73                 {
74                         List<Exception> exs = new List<Exception> ();
75                         foreach (Task t in tasks) {
76                                 if (t.Exception != null)
77                                         exs.Add (t.Exception);
78                         }
79
80                         if (exs.Count > 0) {
81                                 if (infos != null)
82                                         infos.IsExceptional = true;
83
84                                 throw new AggregateException (exs);
85                         }
86                 }
87
88                 static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
89                 {
90                         TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
91
92                         for (int i = 0; i < count; i++) {
93                                 if (options == null)
94                                         tasks [i] = Task.Factory.StartNew (action, creation);
95                                 else
96                                         tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
97                         }
98                 }
99
100 #region For
101
102                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
103                 {
104                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
105                 }
106
107                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
108                 {
109                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
110                 }
111
112                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
113                 {
114                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
115                 }
116
117                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
118                 {
119                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
120                 }
121
122                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
123                                                               int toExclusive,
124                                                               Func<TLocal> localInit,
125                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
126                                                               Action<TLocal> localFinally)
127                 {
128                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
129                 }
130
131                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
132                                                               int toExclusive,
133                                                               ParallelOptions parallelOptions,
134                                                               Func<TLocal> localInit,
135                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
136                                                               Action<TLocal> localFinally)
137                 {
138                         if (body == null)
139                                 throw new ArgumentNullException ("body");
140                         if (localInit == null)
141                                 throw new ArgumentNullException ("localInit");
142                         if (localFinally == null)
143                                 throw new ArgumentNullException ("localFinally");
144                         if (parallelOptions == null)
145                                 throw new ArgumentNullException ("options");
146                         if (fromInclusive >= toExclusive)
147                                 return new ParallelLoopResult (null, true);
148
149                         // Number of task toExclusive be launched (normally == Env.ProcessorCount)
150                         int step;
151                         int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
152
153                         Task[] tasks = new Task [num];
154
155                         StealRange[] ranges = new StealRange[num];
156                         for (int i = 0; i < num; i++)
157                                 ranges[i] = new StealRange (fromInclusive, i, step);
158
159                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
160
161                         int currentIndex = -1;
162
163                         Action workerMethod = delegate {
164                                 int localWorker = Interlocked.Increment (ref currentIndex);
165                                 StealRange range = ranges[localWorker];
166                                 int index = range.V64.Actual;
167                                 int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
168                                 TLocal local = localInit ();
169
170                                 ParallelLoopState state = new ParallelLoopState (infos);
171                                 CancellationToken token = parallelOptions.CancellationToken;
172
173                                 try {
174                                         for (int i = index; i < stopIndex;) {
175                                                 if (infos.IsStopped)
176                                                         return;
177
178                                                 token.ThrowIfCancellationRequested ();
179
180                                                 if (i >= stopIndex - range.V64.Stolen)
181                                                         break;
182
183                                                 if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
184                                                         return;
185
186                                                 state.CurrentIteration = i;
187                                                 local = body (i, state, local);
188
189                                                 if (i + 1 >= stopIndex - range.V64.Stolen)
190                                                         break;
191
192                                                 range.V64.Actual = ++i;
193                                         }
194
195                                         // Try toExclusive steal fromInclusive our right neighbor (cyclic)
196                                         int len = num + localWorker;
197                                         for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
198                                                 int extWorker = sIndex % num;
199                                                 range = ranges[extWorker];
200
201                                                 stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
202                                                 int stolen = -1;
203
204                                                 do {
205                                                         do {
206                                                                 long old;
207                                                                 StealValue64 val = new StealValue64 ();
208
209                                                                 old = sixtyfour ? range.V64.Value : Interlocked.CompareExchange (ref range.V64.Value, 0, 0);
210                                                                 val.Value = old;
211
212                                                                 if (val.Actual >= stopIndex - val.Stolen - 2)
213                                                                         goto next;
214                                                                 stolen = (val.Stolen += 1);
215
216                                                                 if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
217                                                                         break;
218                                                         } while (true);
219
220                                                         stolen = stopIndex - stolen;
221
222                                                         if (stolen > range.V64.Actual)
223                                                                 local = body (stolen, state, local);
224                                                         else
225                                                                 break;
226                                                 } while (true);
227
228                                                 next:
229                                                 continue;
230                                         }
231                                 } finally {
232                                         localFinally (local);
233                                 }
234                         };
235
236                         InitTasks (tasks, num, workerMethod, parallelOptions);
237
238                         try {
239                                 Task.WaitAll (tasks);
240                         } catch {
241                                 HandleExceptions (tasks, infos);
242                         }
243
244                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
245                 }
246
247                 [StructLayout(LayoutKind.Explicit)]
248                 struct StealValue64 {
249                         [FieldOffset(0)]
250                         public long Value;
251                         [FieldOffset(0)]
252                         public int Actual;
253                         [FieldOffset(4)]
254                         public int Stolen;
255                 }
256
257                 class StealRange
258                 {
259                         public StealValue64 V64 = new StealValue64 ();
260
261                         public StealRange (int fromInclusive, int i, int step)
262                         {
263                                 V64.Actual = fromInclusive + i * step;
264                         }
265                 }
266
267 #endregion
268
269 #region For (long)
270
271                 [MonoTODO]
272                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
273                 {
274                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
275                 }
276
277                 [MonoTODO]
278                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
279                 {
280                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
281                 }
282
283                 [MonoTODO]
284                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
285                 {
286                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
287                 }
288
289                 [MonoTODO]
290                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
291                 {
292                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
293                 }
294
295                 [MonoTODO]
296                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
297                                                               long toExclusive,
298                                                               Func<TLocal> localInit,
299                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
300                                                               Action<TLocal> localFinally)
301                 {
302                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
303                 }
304
305                 [MonoTODO ("See how this can be refactored with the above For implementation")]
306                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
307                                                               long toExclusive,
308                                                               ParallelOptions parallelOptions,
309                                                               Func<TLocal> localInit,
310                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
311                                                               Action<TLocal> localFinally)
312                 {
313                         if (body == null)
314                                 throw new ArgumentNullException ("body");
315                         if (localInit == null)
316                                 throw new ArgumentNullException ("localInit");
317                         if (localFinally == null)
318                                 throw new ArgumentNullException ("localFinally");
319                         if (parallelOptions == null)
320                                 throw new ArgumentNullException ("options");
321                         if (fromInclusive >= toExclusive)
322                                 return new ParallelLoopResult (null, true);
323
324                         throw new NotImplementedException ();
325                 }
326
327 #endregion
328
329 #region Foreach
330                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
331                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
332                                                                     Action<TLocal> destruct)
333                 {
334                         if (enumerable == null)
335                                 throw new ArgumentNullException ("source");
336                         if (options == null)
337                                 throw new ArgumentNullException ("options");
338                         if (action == null)
339                                 throw new ArgumentNullException ("action");
340                         if (init == null)
341                                 throw new ArgumentNullException ("init");
342                         if (destruct == null)
343                                 throw new ArgumentNullException ("destruct");
344
345                         int num = Math.Min (GetBestWorkerNumber (),
346                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
347
348                         Task[] tasks = new Task[num];
349                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
350
351                         SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
352                         const int bagCount = 5;
353
354                         IList<IEnumerator<TSource>> slices = enumerable (num);
355
356                         int sliceIndex = -1;
357
358                         Action workerMethod = delegate {
359                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
360
361                                 TLocal local = init ();
362                                 ParallelLoopState state = new ParallelLoopState (infos);
363                                 int workIndex = bag.GetNextIndex ();
364                                 CancellationToken token = options.CancellationToken;
365
366                                 try {
367                                         bool cont = true;
368                                         TSource element;
369
370                                         while (cont) {
371                                                 if (infos.IsStopped || infos.IsBroken.Value)
372                                                         return;
373
374                                                 token.ThrowIfCancellationRequested ();
375
376                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
377                                                         bag.Add (workIndex, slice.Current);
378                                                 }
379
380                                                 for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
381                                                         if (infos.IsStopped)
382                                                                 return;
383
384                                                         token.ThrowIfCancellationRequested ();
385
386                                                         local = action (element, state, local);
387                                                 }
388                                         }
389
390                                         while (bag.TrySteal (workIndex, out element)) {
391                                                 token.ThrowIfCancellationRequested ();
392
393                                                 local = action (element, state, local);
394
395                                                 if (infos.IsStopped || infos.IsBroken.Value)
396                                                         return;
397                                         }
398                                 } finally {
399                                         destruct (local);
400                                 }
401                         };
402
403                         InitTasks (tasks, num, workerMethod, options);
404
405                         try {
406                                 Task.WaitAll (tasks);
407                         } catch {
408                                 HandleExceptions (tasks, infos);
409                         }
410
411                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
412                 }
413
414                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
415                 {
416                         if (source == null)
417                                 throw new ArgumentNullException ("source");
418                         if (body == null)
419                                 throw new ArgumentNullException ("body");
420
421                         return ForEach<TSource, object> (Partitioner.Create (source),
422                                                          ParallelOptions.Default,
423                                                          () => null,
424                                                          (e, s, l) => { body (e); return null; },
425                                                          _ => {});
426                 }
427
428                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
429                 {
430                         if (source == null)
431                                 throw new ArgumentNullException ("source");
432                         if (body == null)
433                                 throw new ArgumentNullException ("body");
434
435                         return ForEach<TSource, object> (Partitioner.Create (source),
436                                                          ParallelOptions.Default,
437                                                          () => null,
438                                                          (e, s, l) => { body (e, s); return null; },
439                                                          _ => {});
440                 }
441
442                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
443                                                                    Action<TSource, ParallelLoopState, long> body)
444                 {
445                         if (source == null)
446                                 throw new ArgumentNullException ("source");
447                         if (body == null)
448                                 throw new ArgumentNullException ("body");
449
450
451                         return ForEach<TSource, object> (Partitioner.Create (source),
452                                                          ParallelOptions.Default,
453                                                          () => null,
454                                                          (e, s, l) => { body (e, s, -1); return null; },
455                                                          _ => {});
456                 }
457
458                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
459                                                                    Action<TSource, ParallelLoopState> body)
460                 {
461                         if (body == null)
462                                 throw new ArgumentNullException ("body");
463
464                         return ForEach<TSource, object> (source,
465                                                          ParallelOptions.Default,
466                                                          () => null,
467                                                          (e, s, l) => { body (e, s); return null; },
468                                                          _ => {});
469                 }
470
471                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
472                                                                    Action<TSource, ParallelLoopState, long> body)
473
474                 {
475                         if (body == null)
476                                 throw new ArgumentNullException ("body");
477
478                         return ForEach<TSource, object> (source,
479                                                          ParallelOptions.Default,
480                                                          () => null,
481                                                          (e, s, i, l) => { body (e, s, i); return null; },
482                                                          _ => {});
483                 }
484
485                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
486                                                                    Action<TSource> body)
487
488                 {
489                         if (body == null)
490                                 throw new ArgumentNullException ("body");
491
492                         return ForEach<TSource, object> (source,
493                                                          ParallelOptions.Default,
494                                                          () => null,
495                                                          (e, s, l) => { body (e); return null; },
496                                                          _ => {});
497                 }
498
499                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
500                                                                    ParallelOptions parallelOptions,
501                                                                    Action<TSource> body)
502                 {
503                         if (source == null)
504                                 throw new ArgumentNullException ("source");
505                         if (body == null)
506                                 throw new ArgumentNullException ("body");
507
508                         return ForEach<TSource, object> (Partitioner.Create (source),
509                                                          parallelOptions,
510                                                          () => null,
511                                                          (e, s, l) => { body (e); return null; },
512                                                          _ => {});
513                 }
514
515                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
516                                                                    Action<TSource, ParallelLoopState> body)
517                 {
518                         if (source == null)
519                                 throw new ArgumentNullException ("source");
520                         if (body == null)
521                                 throw new ArgumentNullException ("body");
522
523                         return ForEach<TSource, object> (Partitioner.Create (source),
524                                                          parallelOptions,
525                                                          () => null,
526                                                          (e, s, l) => { body (e, s); return null; },
527                                                          _ => {});
528                 }
529
530                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
531                                                                    Action<TSource, ParallelLoopState, long> body)
532                 {
533                         if (source == null)
534                                 throw new ArgumentNullException ("source");
535                         if (body == null)
536                                 throw new ArgumentNullException ("body");
537
538                         return ForEach<TSource, object> (Partitioner.Create (source),
539                                                          parallelOptions,
540                                                          () => null,
541                                                          (e, s, i, l) => { body (e, s, i); return null; },
542                                                          _ => {});
543                 }
544
545                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
546                                                                    Action<TSource, ParallelLoopState, long> body)
547
548                 {
549                         if (body == null)
550                                 throw new ArgumentNullException ("body");
551
552                         return ForEach<TSource, object> (source,
553                                                          parallelOptions,
554                                                          () => null,
555                                                          (e, s, i, l) => { body (e, s, i); return null; },
556                                                          _ => {});
557                 }
558
559                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
560                                                                    Action<TSource> body)
561                 {
562                         if (body == null)
563                                 throw new ArgumentNullException ("body");
564
565                         return ForEach<TSource, object> (source,
566                                                          parallelOptions,
567                                                          () => null,
568                                                          (e, s, l) => { body (e); return null; },
569                                                          _ => {});
570                 }
571
572                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
573                                                                    Action<TSource, ParallelLoopState> body)
574                 {
575                         return ForEach<TSource, object> (source,
576                                                          parallelOptions,
577                                                          () => null,
578                                                          (e, s, l) => { body (e, s); return null; },
579                                                          _ => {});
580                 }
581
582                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
583                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
584                                                                            Action<TLocal> localFinally)
585                 {
586                         if (source == null)
587                                 throw new ArgumentNullException ("source");
588
589                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
590                                                          ParallelOptions.Default,
591                                                          localInit,
592                                                          body,
593                                                          localFinally);
594                 }
595
596                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
597                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
598                                                                            Action<TLocal> localFinally)
599                 {
600                         return ForEach<TSource, TLocal> (Partitioner.Create (source),
601                                                          ParallelOptions.Default,
602                                                          localInit,
603                                                          body,
604                                                          localFinally);
605                 }
606
607                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
608                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
609                                                                            Action<TLocal> localFinally)
610                 {
611                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
612                 }
613
614                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
615                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
616                                                                            Action<TLocal> localFinally)
617                 {
618                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
619                 }
620
621                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
622                                                                            Func<TLocal> localInit,
623                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
624                                                                            Action<TLocal> localFinally)
625                 {
626                         if (source == null)
627                                 throw new ArgumentNullException ("source");
628
629                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
630                 }
631
632                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
633                                                                            Func<TLocal> localInit,
634                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
635                                                                            Action<TLocal> localFinally)
636                 {
637                         if (source == null)
638                                 throw new ArgumentNullException ("source");
639
640                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
641                 }
642
643                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
644                                                                            Func<TLocal> localInit,
645                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
646                                                                            Action<TLocal> localFinally)
647                 {
648                         if (source == null)
649                                 throw new ArgumentNullException ("source");
650                         if (body == null)
651                                 throw new ArgumentNullException ("body");
652
653                         return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
654                 }
655
656                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
657                                                                            Func<TLocal> localInit,
658                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
659                                                                            Action<TLocal> localFinally)
660                 {
661                         if (source == null)
662                                 throw new ArgumentNullException ("source");
663                         if (body == null)
664                                 throw new ArgumentNullException ("body");
665
666                         return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
667                                                                              parallelOptions,
668                                                                              localInit,
669                                                                              (e, s, l) => body (e.Value, s, e.Key, l),
670                                                                              localFinally);
671                 }
672                 #endregion
673
674                 #region Invoke
675                 public static void Invoke (params Action[] actions)
676                 {
677                         if (actions == null)
678                                 throw new ArgumentNullException ("actions");
679
680                         Invoke (actions, (Action a) => Task.Factory.StartNew (a));
681                 }
682
683                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
684                 {
685                         if (parallelOptions == null)
686                                 throw new ArgumentNullException ("parallelOptions");
687                         if (actions == null)
688                                 throw new ArgumentNullException ("actions");
689
690                         Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
691                 }
692
693                 static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
694                 {
695                         if (actions.Length == 0)
696                                 throw new ArgumentException ("actions is empty");
697
698                         // Execute it directly
699                         if (actions.Length == 1 && actions[0] != null)
700                                 actions[0] ();
701
702                         bool shouldThrow = false;
703                         Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
704                                 if (a == null) {
705                                         shouldThrow = true;
706                                         return null;
707                                 }
708
709                                 return taskCreator (a);
710                         });
711
712                         if (shouldThrow)
713                                 throw new ArgumentException ("One action in actions is null", "actions");
714
715                         try {
716                                 Task.WaitAll (ts);
717                         } catch {
718                                 HandleExceptions (ts);
719                         }
720                 }
721                 #endregion
722
723                 #region SpawnBestNumber, used by PLinq
724                 internal static Task[] SpawnBestNumber (Action action, Action callback)
725                 {
726                         return SpawnBestNumber (action, -1, callback);
727                 }
728
729                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
730                 {
731                         return SpawnBestNumber (action, dop, false, callback);
732                 }
733
734                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
735                 {
736                         // Get the optimum amount of worker to create
737                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
738
739                         // Initialize worker
740                         CountdownEvent evt = new CountdownEvent (num);
741                         Task[] tasks = new Task [num];
742                         for (int i = 0; i < num; i++) {
743                                 tasks [i] = Task.Factory.StartNew (() => {
744                                         action ();
745                                         evt.Signal ();
746                                         if (callback != null && evt.IsSet)
747                                                 callback ();
748                                 });
749                         }
750
751                         // If explicitely told, wait for all workers to complete
752                         // and thus let main thread participate in the processing
753                         if (wait)
754                                 Task.WaitAll (tasks);
755
756                         return tasks;
757                 }
758 #endregion
759         }
760 }
761 #endif