Fix continuation not being scheduled because of too early and too greedy disposing.
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 // Parallel.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30
31 namespace System.Threading.Tasks
32 {
33         public static class Parallel
34         {
35                 internal static int GetBestWorkerNumber ()
36                 {
37                         return GetBestWorkerNumber (TaskScheduler.Current);
38                 }
39
40                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
41                 {
42                         return scheduler.MaximumConcurrencyLevel;
43                 }
44
45                 static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
46                 {
47                         int num = Math.Min (GetBestWorkerNumber (),
48                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
49                         // Integer range that each task process
50                         if ((step = (to - from) / num) < 5) {
51                                 step = 5;
52                                 num = (to - from) / 5;
53                                 if (num < 1)
54                                         num = 1;
55                         }
56
57                         return num;
58                 }
59
60                 static void HandleExceptions (IEnumerable<Task> tasks)
61                 {
62                         HandleExceptions (tasks, null);
63                 }
64
65                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
66                 {
67                         List<Exception> exs = new List<Exception> ();
68                         foreach (Task t in tasks) {
69                                 if (t.Exception != null)
70                                         exs.Add (t.Exception);
71                         }
72
73                         if (exs.Count > 0) {
74                                 if (infos != null)
75                                         infos.IsExceptional = true;
76
77                                 throw new AggregateException (exs);
78                         }
79                 }
80
81                 static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
82                 {
83                         TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
84
85                         for (int i = 0; i < count; i++) {
86                                 if (options == null)
87                                         tasks [i] = Task.Factory.StartNew (action, creation);
88                                 else
89                                         tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
90                         }
91                 }
92
93 #region For
94
95                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
96                 {
97                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
98                 }
99
100                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
101                 {
102                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
103                 }
104
105                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
106                 {
107                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
108                 }
109
110                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
111                 {
112                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
113                 }
114
115                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
116                                                               int toExclusive,
117                                                               Func<TLocal> localInit,
118                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
119                                                               Action<TLocal> localFinally)
120                 {
121                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
122                 }
123
124                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
125                                                               int toExclusive,
126                                                               ParallelOptions parallelOptions,
127                                                               Func<TLocal> localInit,
128                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
129                                                               Action<TLocal> localFinally)
130                 {
131                         if (body == null)
132                                 throw new ArgumentNullException ("body");
133                         if (localInit == null)
134                                 throw new ArgumentNullException ("localInit");
135                         if (localFinally == null)
136                                 throw new ArgumentNullException ("localFinally");
137                         if (parallelOptions == null)
138                                 throw new ArgumentNullException ("options");
139                         if (fromInclusive >= toExclusive)
140                                 return new ParallelLoopResult (null, true);
141
142                         // Number of task toExclusive be launched (normally == Env.ProcessorCount)
143                         int step;
144                         int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
145
146                         Task[] tasks = new Task [num];
147
148                         StealRange[] ranges = new StealRange[num];
149                         for (int i = 0; i < num; i++)
150                                 ranges[i] = new StealRange (fromInclusive, i, step);
151
152                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
153
154                         int currentIndex = -1;
155
156                         Action workerMethod = delegate {
157                                 int localWorker = Interlocked.Increment (ref currentIndex);
158                                 StealRange range = ranges[localWorker];
159                                 int index = range.Actual;
160                                 int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
161                                 TLocal local = localInit ();
162
163                                 ParallelLoopState state = new ParallelLoopState (infos);
164                                 CancellationToken token = parallelOptions.CancellationToken;
165
166                                 try {
167                                         for (int i = index; i < stopIndex; range.Actual = ++i) {
168                                                 if (infos.IsStopped)
169                                                         return;
170
171                                                 token.ThrowIfCancellationRequested ();
172
173                                                 if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
174                                                         return;
175
176                                                 state.CurrentIteration = i;
177                                                 local = body (i, state, local);
178                                                 if (i >= stopIndex - range.Stolen)
179                                                         break;
180                                         }
181
182                                         // Try toExclusive steal fromInclusive our right neighbor (cyclic)
183                                         int len = num + localWorker;
184                                         for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
185                                                 int extWorker = sIndex % num;
186                                                 range = ranges[extWorker];
187
188                                                 stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
189
190                                                 int stolen;
191                                                 do {
192                                                         stolen = range.Stolen;
193                                                         if (stopIndex - stolen > range.Actual)
194                                                                 goto next;
195                                                 } while (Interlocked.CompareExchange (ref range.Stolen, stolen + 1, stolen) != stolen);
196
197                                                 stolen = stopIndex - stolen - 1;
198
199                                                 if (stolen > range.Actual)
200                                                         local = body (stolen, state, local);
201
202                                         next:
203                                                 continue;
204                                         }
205                                 } finally {
206                                         localFinally (local);
207                                 }
208                         };
209
210                         InitTasks (tasks, num, workerMethod, parallelOptions);
211
212                         try {
213                                 Task.WaitAll (tasks);
214                         } catch {
215                                 HandleExceptions (tasks, infos);
216                         }
217
218                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
219                 }
220
221                 class StealRange
222                 {
223                         public int Stolen;
224                         public int Actual;
225
226                         public StealRange (int fromInclusive, int i, int step)
227                         {
228                                 Actual = fromInclusive + i * step;
229                         }
230                 }
231
232 #endregion
233
234 #region For (long)
235
236                 [MonoTODO]
237                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
238                 {
239                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
240                 }
241
242                 [MonoTODO]
243                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
244                 {
245                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
246                 }
247
248                 [MonoTODO]
249                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
250                 {
251                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
252                 }
253
254                 [MonoTODO]
255                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
256                 {
257                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
258                 }
259
260                 [MonoTODO]
261                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
262                                                               long toExclusive,
263                                                               Func<TLocal> localInit,
264                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
265                                                               Action<TLocal> localFinally)
266                 {
267                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
268                 }
269
270                 [MonoTODO ("See how this can be refactored with the above For implementation")]
271                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
272                                                               long toExclusive,
273                                                               ParallelOptions parallelOptions,
274                                                               Func<TLocal> localInit,
275                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
276                                                               Action<TLocal> localFinally)
277                 {
278                         if (body == null)
279                                 throw new ArgumentNullException ("body");
280                         if (localInit == null)
281                                 throw new ArgumentNullException ("localInit");
282                         if (localFinally == null)
283                                 throw new ArgumentNullException ("localFinally");
284                         if (parallelOptions == null)
285                                 throw new ArgumentNullException ("options");
286                         if (fromInclusive >= toExclusive)
287                                 return new ParallelLoopResult (null, true);
288
289                         throw new NotImplementedException ();
290                 }
291
292 #endregion
293
294 #region Foreach
295                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
296                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
297                                                                     Action<TLocal> destruct)
298                 {
299                         if (enumerable == null)
300                                 throw new ArgumentNullException ("source");
301                         if (options == null)
302                                 throw new ArgumentNullException ("options");
303                         if (action == null)
304                                 throw new ArgumentNullException ("action");
305                         if (init == null)
306                                 throw new ArgumentNullException ("init");
307                         if (destruct == null)
308                                 throw new ArgumentNullException ("destruct");
309
310                         int num = Math.Min (GetBestWorkerNumber (),
311                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
312
313                         Task[] tasks = new Task[num];
314                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
315
316                         SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
317                         const int bagCount = 5;
318
319                         IList<IEnumerator<TSource>> slices = enumerable (num);
320
321                         int sliceIndex = -1;
322
323                         Action workerMethod = delegate {
324                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
325
326                                 TLocal local = init ();
327                                 ParallelLoopState state = new ParallelLoopState (infos);
328                                 int workIndex = bag.GetNextIndex ();
329                                 CancellationToken token = options.CancellationToken;
330
331                                 try {
332                                         bool cont = true;
333                                         TSource element;
334
335                                         while (cont) {
336                                                 if (infos.IsStopped || infos.IsBroken.Value)
337                                                         return;
338
339                                                 token.ThrowIfCancellationRequested ();
340
341                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
342                                                         bag.Add (workIndex, slice.Current);
343                                                 }
344
345                                                 for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
346                                                         if (infos.IsStopped)
347                                                                 return;
348
349                                                         token.ThrowIfCancellationRequested ();
350
351                                                         local = action (element, state, local);
352                                                 }
353                                         }
354
355                                         while (bag.TrySteal (workIndex, out element)) {
356                                                 token.ThrowIfCancellationRequested ();
357
358                                                 local = action (element, state, local);
359
360                                                 if (infos.IsStopped || infos.IsBroken.Value)
361                                                         return;
362                                         }
363                                 } finally {
364                                         destruct (local);
365                                 }
366                         };
367
368                         InitTasks (tasks, num, workerMethod, options);
369
370                         try {
371                                 Task.WaitAll (tasks);
372                         } catch {
373                                 HandleExceptions (tasks, infos);
374                         }
375
376                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
377                 }
378
379                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
380                 {
381                         if (source == null)
382                                 throw new ArgumentNullException ("source");
383                         if (body == null)
384                                 throw new ArgumentNullException ("body");
385
386                         return ForEach<TSource, object> (Partitioner.Create (source),
387                                                          ParallelOptions.Default,
388                                                          () => null,
389                                                          (e, s, l) => { body (e); return null; },
390                                                          _ => {});
391                 }
392
393                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
394                 {
395                         if (source == null)
396                                 throw new ArgumentNullException ("source");
397                         if (body == null)
398                                 throw new ArgumentNullException ("body");
399
400                         return ForEach<TSource, object> (Partitioner.Create (source),
401                                                          ParallelOptions.Default,
402                                                          () => null,
403                                                          (e, s, l) => { body (e, s); return null; },
404                                                          _ => {});
405                 }
406
407                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
408                                                                    Action<TSource, ParallelLoopState, long> body)
409                 {
410                         if (source == null)
411                                 throw new ArgumentNullException ("source");
412                         if (body == null)
413                                 throw new ArgumentNullException ("body");
414
415
416                         return ForEach<TSource, object> (Partitioner.Create (source),
417                                                          ParallelOptions.Default,
418                                                          () => null,
419                                                          (e, s, l) => { body (e, s, -1); return null; },
420                                                          _ => {});
421                 }
422
423                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
424                                                                    Action<TSource, ParallelLoopState> body)
425                 {
426                         if (body == null)
427                                 throw new ArgumentNullException ("body");
428
429                         return ForEach<TSource, object> (source,
430                                                          ParallelOptions.Default,
431                                                          () => null,
432                                                          (e, s, l) => { body (e, s); return null; },
433                                                          _ => {});
434                 }
435
436                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
437                                                                    Action<TSource, ParallelLoopState, long> body)
438
439                 {
440                         if (body == null)
441                                 throw new ArgumentNullException ("body");
442
443                         return ForEach<TSource, object> (source,
444                                                          ParallelOptions.Default,
445                                                          () => null,
446                                                          (e, s, i, l) => { body (e, s, i); return null; },
447                                                          _ => {});
448                 }
449
450                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
451                                                                    Action<TSource> body)
452
453                 {
454                         if (body == null)
455                                 throw new ArgumentNullException ("body");
456
457                         return ForEach<TSource, object> (source,
458                                                          ParallelOptions.Default,
459                                                          () => null,
460                                                          (e, s, l) => { body (e); return null; },
461                                                          _ => {});
462                 }
463
464                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
465                                                                    ParallelOptions parallelOptions,
466                                                                    Action<TSource> body)
467                 {
468                         if (source == null)
469                                 throw new ArgumentNullException ("source");
470                         if (body == null)
471                                 throw new ArgumentNullException ("body");
472
473                         return ForEach<TSource, object> (Partitioner.Create (source),
474                                                          parallelOptions,
475                                                          () => null,
476                                                          (e, s, l) => { body (e); return null; },
477                                                          _ => {});
478                 }
479
480                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
481                                                                    Action<TSource, ParallelLoopState> body)
482                 {
483                         if (source == null)
484                                 throw new ArgumentNullException ("source");
485                         if (body == null)
486                                 throw new ArgumentNullException ("body");
487
488                         return ForEach<TSource, object> (Partitioner.Create (source),
489                                                          parallelOptions,
490                                                          () => null,
491                                                          (e, s, l) => { body (e, s); return null; },
492                                                          _ => {});
493                 }
494
495                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
496                                                                    Action<TSource, ParallelLoopState, long> body)
497                 {
498                         if (source == null)
499                                 throw new ArgumentNullException ("source");
500                         if (body == null)
501                                 throw new ArgumentNullException ("body");
502
503                         return ForEach<TSource, object> (Partitioner.Create (source),
504                                                          parallelOptions,
505                                                          () => null,
506                                                          (e, s, i, l) => { body (e, s, i); return null; },
507                                                          _ => {});
508                 }
509
510                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
511                                                                    Action<TSource, ParallelLoopState, long> body)
512
513                 {
514                         if (body == null)
515                                 throw new ArgumentNullException ("body");
516
517                         return ForEach<TSource, object> (source,
518                                                          parallelOptions,
519                                                          () => null,
520                                                          (e, s, i, l) => { body (e, s, i); return null; },
521                                                          _ => {});
522                 }
523
524                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
525                                                                    Action<TSource> body)
526                 {
527                         if (body == null)
528                                 throw new ArgumentNullException ("body");
529
530                         return ForEach<TSource, object> (source,
531                                                          parallelOptions,
532                                                          () => null,
533                                                          (e, s, l) => { body (e); return null; },
534                                                          _ => {});
535                 }
536
537                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
538                                                                    Action<TSource, ParallelLoopState> body)
539                 {
540                         return ForEach<TSource, object> (source,
541                                                          parallelOptions,
542                                                          () => null,
543                                                          (e, s, l) => { body (e, s); return null; },
544                                                          _ => {});
545                 }
546
547                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
548                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
549                                                                            Action<TLocal> localFinally)
550                 {
551                         if (source == null)
552                                 throw new ArgumentNullException ("source");
553
554                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
555                                                          ParallelOptions.Default,
556                                                          localInit,
557                                                          body,
558                                                          localFinally);
559                 }
560
561                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
562                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
563                                                                            Action<TLocal> localFinally)
564                 {
565                         return ForEach<TSource, TLocal> (Partitioner.Create (source),
566                                                          ParallelOptions.Default,
567                                                          localInit,
568                                                          body,
569                                                          localFinally);
570                 }
571
572                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
573                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
574                                                                            Action<TLocal> localFinally)
575                 {
576                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
577                 }
578
579                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
580                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
581                                                                            Action<TLocal> localFinally)
582                 {
583                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
584                 }
585
586                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
587                                                                            Func<TLocal> localInit,
588                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
589                                                                            Action<TLocal> localFinally)
590                 {
591                         if (source == null)
592                                 throw new ArgumentNullException ("source");
593
594                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
595                 }
596
597                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
598                                                                            Func<TLocal> localInit,
599                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
600                                                                            Action<TLocal> localFinally)
601                 {
602                         if (source == null)
603                                 throw new ArgumentNullException ("source");
604
605                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
606                 }
607
608                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
609                                                                            Func<TLocal> localInit,
610                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
611                                                                            Action<TLocal> localFinally)
612                 {
613                         if (source == null)
614                                 throw new ArgumentNullException ("source");
615                         if (body == null)
616                                 throw new ArgumentNullException ("body");
617
618                         return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
619                 }
620
621                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
622                                                                            Func<TLocal> localInit,
623                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
624                                                                            Action<TLocal> localFinally)
625                 {
626                         if (source == null)
627                                 throw new ArgumentNullException ("source");
628                         if (body == null)
629                                 throw new ArgumentNullException ("body");
630
631                         return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
632                                                                              parallelOptions,
633                                                                              localInit,
634                                                                              (e, s, l) => body (e.Value, s, e.Key, l),
635                                                                              localFinally);
636                 }
637                 #endregion
638
639                 #region Invoke
640                 public static void Invoke (params Action[] actions)
641                 {
642                         if (actions == null)
643                                 throw new ArgumentNullException ("actions");
644
645                         Invoke (actions, (Action a) => Task.Factory.StartNew (a));
646                 }
647
648                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
649                 {
650                         if (parallelOptions == null)
651                                 throw new ArgumentNullException ("parallelOptions");
652                         if (actions == null)
653                                 throw new ArgumentNullException ("actions");
654
655                         Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
656                 }
657
658                 static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
659                 {
660                         if (actions.Length == 0)
661                                 throw new ArgumentException ("actions is empty");
662
663                         // Execute it directly
664                         if (actions.Length == 1 && actions[0] != null)
665                                 actions[0] ();
666
667                         bool shouldThrow = false;
668                         Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
669                                 if (a == null) {
670                                         shouldThrow = true;
671                                         return null;
672                                 }
673
674                                 return taskCreator (a);
675                         });
676
677                         if (shouldThrow)
678                                 throw new ArgumentException ("One action in actions is null", "actions");
679
680                         try {
681                                 Task.WaitAll (ts);
682                         } catch {
683                                 HandleExceptions (ts);
684                         }
685                 }
686                 #endregion
687
688                 #region SpawnBestNumber, used by PLinq
689                 internal static Task[] SpawnBestNumber (Action action, Action callback)
690                 {
691                         return SpawnBestNumber (action, -1, callback);
692                 }
693
694                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
695                 {
696                         return SpawnBestNumber (action, dop, false, callback);
697                 }
698
699                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
700                 {
701                         // Get the optimum amount of worker to create
702                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
703
704                         // Initialize worker
705                         CountdownEvent evt = new CountdownEvent (num);
706                         Task[] tasks = new Task [num];
707                         for (int i = 0; i < num; i++) {
708                                 tasks [i] = Task.Factory.StartNew (() => {
709                                         action ();
710                                         evt.Signal ();
711                                         if (callback != null && evt.IsSet)
712                                                 callback ();
713                                 });
714                         }
715
716                         // If explicitely told, wait for all workers to complete
717                         // and thus let main thread participate in the processing
718                         if (wait)
719                                 Task.WaitAll (tasks);
720
721                         return tasks;
722                 }
723 #endregion
724         }
725 }
726 #endif