Merge pull request #980 from StephenMcConnel/bug-18638
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 // Parallel.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30 using System.Runtime.InteropServices;
31
32 namespace System.Threading.Tasks
33 {
34         public static class Parallel
35         {
36                 internal static int GetBestWorkerNumber ()
37                 {
38                         return GetBestWorkerNumber (TaskScheduler.Current);
39                 }
40
41                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
42                 {
43                         return Math.Min (Environment.ProcessorCount, (scheduler ?? TaskScheduler.Current).MaximumConcurrencyLevel);
44                 }
45
46                 static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
47                 {
48                         int num = GetBestWorkerNumber(options.TaskScheduler);
49                         if (options != null && options.MaxDegreeOfParallelism != -1)
50                                 num = Math.Min (options.MaxDegreeOfParallelism, num);
51                         // Integer range that each task process
52                         if ((step = (to - from) / num) < 5) {
53                                 step = 5;
54                                 num = (to - from) / 5;
55                                 if (num < 1)
56                                         num = 1;
57                         }
58
59                         return num;
60                 }
61
62                 static void HandleExceptions (IEnumerable<Task> tasks)
63                 {
64                         HandleExceptions (tasks, null);
65                 }
66
67                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
68                 {
69                         List<Exception> exs = new List<Exception> ();
70                         foreach (Task t in tasks) {
71                                 if (t.Exception != null)
72                                         exs.Add (t.Exception);
73                         }
74
75                         if (exs.Count > 0) {
76                                 if (infos != null)
77                                         infos.IsExceptional = true;
78
79                                 throw new AggregateException (exs).Flatten ();
80                         }
81                 }
82
83                 static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
84                 {
85                         TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
86
87                         for (int i = 0; i < count; i++) {
88                                 if (options == null)
89                                         tasks [i] = Task.Factory.StartNew (action, creation);
90                                 else
91                                         tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
92                         }
93                 }
94
95 #region For
96
97                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
98                 {
99                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
100                 }
101
102                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
103                 {
104                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
105                 }
106
107                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
108                 {
109                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
110                 }
111
112                 public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
113                 {
114                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
115                 }
116
117                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
118                                                               int toExclusive,
119                                                               Func<TLocal> localInit,
120                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
121                                                               Action<TLocal> localFinally)
122                 {
123                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
124                 }
125
126                 public static ParallelLoopResult For<TLocal> (int fromInclusive,
127                                                               int toExclusive,
128                                                               ParallelOptions parallelOptions,
129                                                               Func<TLocal> localInit,
130                                                               Func<int, ParallelLoopState, TLocal, TLocal> body,
131                                                               Action<TLocal> localFinally)
132                 {
133                         if (body == null)
134                                 throw new ArgumentNullException ("body");
135                         if (localInit == null)
136                                 throw new ArgumentNullException ("localInit");
137                         if (localFinally == null)
138                                 throw new ArgumentNullException ("localFinally");
139                         if (parallelOptions == null)
140                                 throw new ArgumentNullException ("options");
141                         if (fromInclusive >= toExclusive)
142                                 return new ParallelLoopResult (null, true);
143
144                         // Number of task toExclusive be launched (normally == Env.ProcessorCount)
145                         int step;
146                         int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
147
148                         Task[] tasks = new Task [num];
149
150                         StealRange[] ranges = new StealRange[num];
151                         for (int i = 0; i < num; i++)
152                                 ranges[i] = new StealRange (fromInclusive, i, step);
153
154                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
155
156                         int currentIndex = -1;
157
158                         Action workerMethod = delegate {
159                                 int localWorker = Interlocked.Increment (ref currentIndex);
160                                 StealRange range = ranges[localWorker];
161                                 int index = range.V64.Actual;
162                                 int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
163                                 TLocal local = localInit ();
164
165                                 ParallelLoopState state = new ParallelLoopState (infos);
166                                 CancellationToken token = parallelOptions.CancellationToken;
167
168                                 try {
169                                         for (int i = index; i < stopIndex;) {
170                                                 if (infos.IsStopped)
171                                                         return;
172
173                                                 token.ThrowIfCancellationRequested ();
174
175                                                 if (i >= stopIndex - range.V64.Stolen)
176                                                         break;
177
178                                                 if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
179                                                         return;
180
181                                                 state.CurrentIteration = i;
182                                                 local = body (i, state, local);
183
184                                                 if (i + 1 >= stopIndex - range.V64.Stolen)
185                                                         break;
186
187                                                 range.V64.Actual = ++i;
188                                         }
189
190                                          bool sixtyfour = Environment.Is64BitProcess;
191                                         
192                                         // Try toExclusive steal fromInclusive our right neighbor (cyclic)
193                                         int len = num + localWorker;
194                                         for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
195                                                 int extWorker = sIndex % num;
196                                                 range = ranges[extWorker];
197
198                                                 stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
199                                                 int stolen = -1;
200
201                                                 do {
202                                                         do {
203                                                                 long old;
204                                                                 StealValue64 val = new StealValue64 ();
205
206                                                                 old = Volatile.Read (ref range.V64.Value);
207                                                                 val.Value = old;
208
209                                                                 if (val.Actual >= stopIndex - val.Stolen - 2)
210                                                                         goto next;
211                                                                 stolen = (val.Stolen += 1);
212
213                                                                 if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
214                                                                         break;
215                                                         } while (true);
216
217                                                         stolen = stopIndex - stolen;
218
219                                                         if (stolen > range.V64.Actual)
220                                                                 local = body (stolen, state, local);
221                                                         else
222                                                                 break;
223                                                 } while (true);
224
225                                                 next:
226                                                 continue;
227                                         }
228                                 } finally {
229                                         localFinally (local);
230                                 }
231                         };
232
233                         InitTasks (tasks, num, workerMethod, parallelOptions);
234
235                         try {
236                                 Task.WaitAll (tasks);
237                         } catch {
238                                 HandleExceptions (tasks, infos);
239                         }
240
241                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
242                 }
243
244                 [StructLayout(LayoutKind.Explicit)]
245                 struct StealValue64 {
246                         [FieldOffset(0)]
247                         public long Value;
248                         [FieldOffset(0)]
249                         public int Actual;
250                         [FieldOffset(4)]
251                         public int Stolen;
252                 }
253
254                 class StealRange
255                 {
256                         public StealValue64 V64 = new StealValue64 ();
257
258                         public StealRange (int fromInclusive, int i, int step)
259                         {
260                                 V64.Actual = fromInclusive + i * step;
261                         }
262                 }
263
264 #endregion
265
266 #region For (long)
267
268                 [MonoTODO]
269                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
270                 {
271                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
272                 }
273
274                 [MonoTODO]
275                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
276                 {
277                         return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
278                 }
279
280                 [MonoTODO]
281                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
282                 {
283                         return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
284                 }
285
286                 [MonoTODO]
287                 public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
288                 {
289                         return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
290                 }
291
292                 [MonoTODO]
293                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
294                                                               long toExclusive,
295                                                               Func<TLocal> localInit,
296                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
297                                                               Action<TLocal> localFinally)
298                 {
299                         return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
300                 }
301
302                 [MonoTODO ("See how this can be refactored with the above For implementation")]
303                 public static ParallelLoopResult For<TLocal> (long fromInclusive,
304                                                               long toExclusive,
305                                                               ParallelOptions parallelOptions,
306                                                               Func<TLocal> localInit,
307                                                               Func<long, ParallelLoopState, TLocal, TLocal> body,
308                                                               Action<TLocal> localFinally)
309                 {
310                         if (body == null)
311                                 throw new ArgumentNullException ("body");
312                         if (localInit == null)
313                                 throw new ArgumentNullException ("localInit");
314                         if (localFinally == null)
315                                 throw new ArgumentNullException ("localFinally");
316                         if (parallelOptions == null)
317                                 throw new ArgumentNullException ("options");
318                         if (fromInclusive >= toExclusive)
319                                 return new ParallelLoopResult (null, true);
320
321                         throw new NotImplementedException ();
322                 }
323
324 #endregion
325
326 #region Foreach
327                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
328                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
329                                                                     Action<TLocal> destruct)
330                 {
331                         if (enumerable == null)
332                                 throw new ArgumentNullException ("source");
333                         if (options == null)
334                                 throw new ArgumentNullException ("options");
335                         if (action == null)
336                                 throw new ArgumentNullException ("action");
337                         if (init == null)
338                                 throw new ArgumentNullException ("init");
339                         if (destruct == null)
340                                 throw new ArgumentNullException ("destruct");
341
342                         int num = Math.Min (GetBestWorkerNumber (options.TaskScheduler),
343                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
344
345                         Task[] tasks = new Task[num];
346                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
347
348                         SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
349                         const int bagCount = 5;
350
351                         IList<IEnumerator<TSource>> slices = enumerable (num);
352
353                         int sliceIndex = -1;
354
355                         Action workerMethod = delegate {
356                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
357
358                                 TLocal local = init ();
359                                 ParallelLoopState state = new ParallelLoopState (infos);
360                                 int workIndex = bag.GetNextIndex ();
361                                 CancellationToken token = options.CancellationToken;
362
363                                 try {
364                                         bool cont = true;
365                                         TSource element;
366
367                                         while (cont) {
368                                                 if (infos.IsStopped || infos.IsBroken.Value)
369                                                         return;
370
371                                                 token.ThrowIfCancellationRequested ();
372
373                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
374                                                         bag.Add (workIndex, slice.Current);
375                                                 }
376
377                                                 for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
378                                                         if (infos.IsStopped)
379                                                                 return;
380
381                                                         token.ThrowIfCancellationRequested ();
382
383                                                         local = action (element, state, local);
384                                                 }
385                                         }
386
387                                         while (bag.TrySteal (workIndex, out element)) {
388                                                 token.ThrowIfCancellationRequested ();
389
390                                                 local = action (element, state, local);
391
392                                                 if (infos.IsStopped || infos.IsBroken.Value)
393                                                         return;
394                                         }
395                                 } finally {
396                                         destruct (local);
397                                 }
398                         };
399
400                         InitTasks (tasks, num, workerMethod, options);
401
402                         try {
403                                 Task.WaitAll (tasks);
404                         } catch {
405                                 HandleExceptions (tasks, infos);
406                         }
407
408                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
409                 }
410
411                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
412                 {
413                         if (source == null)
414                                 throw new ArgumentNullException ("source");
415                         if (body == null)
416                                 throw new ArgumentNullException ("body");
417
418                         return ForEach<TSource, object> (Partitioner.Create (source),
419                                                          ParallelOptions.Default,
420                                                          () => null,
421                                                          (e, s, l) => { body (e); return null; },
422                                                          _ => {});
423                 }
424
425                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
426                 {
427                         if (source == null)
428                                 throw new ArgumentNullException ("source");
429                         if (body == null)
430                                 throw new ArgumentNullException ("body");
431
432                         return ForEach<TSource, object> (Partitioner.Create (source),
433                                                          ParallelOptions.Default,
434                                                          () => null,
435                                                          (e, s, l) => { body (e, s); return null; },
436                                                          _ => {});
437                 }
438
439                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
440                                                                    Action<TSource, ParallelLoopState, long> body)
441                 {
442                         if (source == null)
443                                 throw new ArgumentNullException ("source");
444                         if (body == null)
445                                 throw new ArgumentNullException ("body");
446
447
448                         return ForEach<TSource, object> (Partitioner.Create (source),
449                                                          ParallelOptions.Default,
450                                                          () => null,
451                                                          (e, s, i, l) => { body (e, s, i); return null; },
452                                                          _ => {});
453                 }
454
455                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
456                                                                    Action<TSource, ParallelLoopState> body)
457                 {
458                         if (body == null)
459                                 throw new ArgumentNullException ("body");
460
461                         return ForEach<TSource, object> (source,
462                                                          ParallelOptions.Default,
463                                                          () => null,
464                                                          (e, s, l) => { body (e, s); return null; },
465                                                          _ => {});
466                 }
467
468                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
469                                                                    Action<TSource, ParallelLoopState, long> body)
470
471                 {
472                         if (body == null)
473                                 throw new ArgumentNullException ("body");
474
475                         return ForEach<TSource, object> (source,
476                                                          ParallelOptions.Default,
477                                                          () => null,
478                                                          (e, s, i, l) => { body (e, s, i); return null; },
479                                                          _ => {});
480                 }
481
482                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
483                                                                    Action<TSource> body)
484
485                 {
486                         if (body == null)
487                                 throw new ArgumentNullException ("body");
488
489                         return ForEach<TSource, object> (source,
490                                                          ParallelOptions.Default,
491                                                          () => null,
492                                                          (e, s, l) => { body (e); return null; },
493                                                          _ => {});
494                 }
495
496                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
497                                                                    ParallelOptions parallelOptions,
498                                                                    Action<TSource> body)
499                 {
500                         if (source == null)
501                                 throw new ArgumentNullException ("source");
502                         if (body == null)
503                                 throw new ArgumentNullException ("body");
504
505                         return ForEach<TSource, object> (Partitioner.Create (source),
506                                                          parallelOptions,
507                                                          () => null,
508                                                          (e, s, l) => { body (e); return null; },
509                                                          _ => {});
510                 }
511
512                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
513                                                                    Action<TSource, ParallelLoopState> body)
514                 {
515                         if (source == null)
516                                 throw new ArgumentNullException ("source");
517                         if (body == null)
518                                 throw new ArgumentNullException ("body");
519
520                         return ForEach<TSource, object> (Partitioner.Create (source),
521                                                          parallelOptions,
522                                                          () => null,
523                                                          (e, s, l) => { body (e, s); return null; },
524                                                          _ => {});
525                 }
526
527                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
528                                                                    Action<TSource, ParallelLoopState, long> body)
529                 {
530                         if (source == null)
531                                 throw new ArgumentNullException ("source");
532                         if (body == null)
533                                 throw new ArgumentNullException ("body");
534
535                         return ForEach<TSource, object> (Partitioner.Create (source),
536                                                          parallelOptions,
537                                                          () => null,
538                                                          (e, s, i, l) => { body (e, s, i); return null; },
539                                                          _ => {});
540                 }
541
542                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
543                                                                    Action<TSource, ParallelLoopState, long> body)
544
545                 {
546                         if (body == null)
547                                 throw new ArgumentNullException ("body");
548
549                         return ForEach<TSource, object> (source,
550                                                          parallelOptions,
551                                                          () => null,
552                                                          (e, s, i, l) => { body (e, s, i); return null; },
553                                                          _ => {});
554                 }
555
556                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
557                                                                    Action<TSource> body)
558                 {
559                         if (body == null)
560                                 throw new ArgumentNullException ("body");
561
562                         return ForEach<TSource, object> (source,
563                                                          parallelOptions,
564                                                          () => null,
565                                                          (e, s, l) => { body (e); return null; },
566                                                          _ => {});
567                 }
568
569                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
570                                                                    Action<TSource, ParallelLoopState> body)
571                 {
572                         return ForEach<TSource, object> (source,
573                                                          parallelOptions,
574                                                          () => null,
575                                                          (e, s, l) => { body (e, s); return null; },
576                                                          _ => {});
577                 }
578
579                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
580                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
581                                                                            Action<TLocal> localFinally)
582                 {
583                         if (source == null)
584                                 throw new ArgumentNullException ("source");
585
586                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
587                                                          ParallelOptions.Default,
588                                                          localInit,
589                                                          body,
590                                                          localFinally);
591                 }
592
593                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
594                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
595                                                                            Action<TLocal> localFinally)
596                 {
597                         return ForEach<TSource, TLocal> (Partitioner.Create (source),
598                                                          ParallelOptions.Default,
599                                                          localInit,
600                                                          body,
601                                                          localFinally);
602                 }
603
604                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
605                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
606                                                                            Action<TLocal> localFinally)
607                 {
608                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
609                 }
610
611                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
612                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
613                                                                            Action<TLocal> localFinally)
614                 {
615                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
616                 }
617
618                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
619                                                                            Func<TLocal> localInit,
620                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
621                                                                            Action<TLocal> localFinally)
622                 {
623                         if (source == null)
624                                 throw new ArgumentNullException ("source");
625
626                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
627                 }
628
629                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
630                                                                            Func<TLocal> localInit,
631                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
632                                                                            Action<TLocal> localFinally)
633                 {
634                         if (source == null)
635                                 throw new ArgumentNullException ("source");
636
637                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
638                 }
639
640                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
641                                                                            Func<TLocal> localInit,
642                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
643                                                                            Action<TLocal> localFinally)
644                 {
645                         if (source == null)
646                                 throw new ArgumentNullException ("source");
647                         if (body == null)
648                                 throw new ArgumentNullException ("body");
649
650                         return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
651                 }
652
653                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
654                                                                            Func<TLocal> localInit,
655                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
656                                                                            Action<TLocal> localFinally)
657                 {
658                         if (source == null)
659                                 throw new ArgumentNullException ("source");
660                         if (body == null)
661                                 throw new ArgumentNullException ("body");
662
663                         return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
664                                                                              parallelOptions,
665                                                                              localInit,
666                                                                              (e, s, l) => body (e.Value, s, e.Key, l),
667                                                                              localFinally);
668                 }
669                 #endregion
670
671                 #region Invoke
672                 public static void Invoke (params Action[] actions)
673                 {
674                         if (actions == null)
675                                 throw new ArgumentNullException ("actions");
676
677                         Invoke (ParallelOptions.Default, actions);
678                 }
679
680                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
681                 {
682                         if (parallelOptions == null)
683                                 throw new ArgumentNullException ("parallelOptions");
684                         if (actions == null)
685                                 throw new ArgumentNullException ("actions");
686                         if (actions.Length == 0)
687                                 throw new ArgumentException ("actions is empty");
688                         foreach (var a in actions)
689                                 if (a == null)
690                                         throw new ArgumentException ("One action in actions is null", "actions");
691                         if (actions.Length == 1) {
692                                 actions[0] ();
693                                 return;
694                         }
695
696                         Task[] ts = new Task[actions.Length];
697                         for (int i = 0; i < ts.Length; i++)
698                                 ts[i] = Task.Factory.StartNew (actions[i],
699                                                                parallelOptions.CancellationToken,
700                                                                TaskCreationOptions.None,
701                                                                parallelOptions.TaskScheduler);
702
703                         try {
704                                 Task.WaitAll (ts, parallelOptions.CancellationToken);
705                         } catch {
706                                 HandleExceptions (ts);
707                         }
708                 }
709                 #endregion
710
711                 #region SpawnBestNumber, used by PLinq
712                 internal static Task[] SpawnBestNumber (Action action, Action callback)
713                 {
714                         return SpawnBestNumber (action, -1, callback);
715                 }
716
717                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
718                 {
719                         return SpawnBestNumber (action, dop, false, callback);
720                 }
721
722                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
723                 {
724                         // Get the optimum amount of worker to create
725                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
726
727                         // Initialize worker
728                         CountdownEvent evt = new CountdownEvent (num);
729                         Task[] tasks = new Task [num];
730                         for (int i = 0; i < num; i++) {
731                                 tasks [i] = Task.Factory.StartNew (() => {
732                                         action ();
733                                         evt.Signal ();
734                                         if (callback != null && evt.IsSet)
735                                                 callback ();
736                                 });
737                         }
738
739                         // If explicitely told, wait for all workers to complete
740                         // and thus let main thread participate in the processing
741                         if (wait)
742                                 Task.WaitAll (tasks);
743
744                         return tasks;
745                 }
746 #endregion
747         }
748 }
749 #endif