2009-12-08 Rodrigo Kumpera <rkumpera@novell.com>
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 #if NET_4_0
2 // Parallel.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30
31 namespace System.Threading.Tasks
32 {
33         public static class Parallel
34         {
35                 internal static int GetBestWorkerNumber ()
36                 {
37                         return GetBestWorkerNumber (TaskScheduler.Current);
38                 }
39                 
40                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
41                 {       
42                         return scheduler.MaximumConcurrencyLevel;
43                 }
44                 
45                 static void HandleExceptions (IEnumerable<Task> tasks)
46                 {
47                         HandleExceptions (tasks, null);
48                 }
49                 
50                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
51                 {
52                         List<Exception> exs = new List<Exception> ();
53                         foreach (Task t in tasks) {
54                                 if (t.Exception != null && !(t.Exception is TaskCanceledException))
55                                         exs.Add (t.Exception);
56                         }
57                         
58                         if (exs.Count > 0) {
59                                 if (infos != null)
60                                         infos.IsExceptional = true;
61                                 
62                                 throw new AggregateException (exs);
63                         }
64                 }
65                 
66                 static void InitTasks (Task[] tasks, Action action, int count)
67                 {
68                         InitTasks (tasks, count, () => Task.Factory.StartNew (action, TaskCreationOptions.DetachedFromParent));
69                 }
70                 
71                 static void InitTasks (Task[] tasks, Action action, int count, TaskScheduler scheduler)
72                 {
73                         InitTasks (tasks, count, () => Task.Factory.StartNew (action, TaskCreationOptions.DetachedFromParent, scheduler));
74                 }
75                 
76                 static void InitTasks (Task[] tasks, int count, Func<Task> taskCreator)
77                 {
78                         for (int i = 0; i < count; i++) {
79                                 tasks [i] = taskCreator ();
80                         }
81                 }
82                 #region For
83                 
84                 public static ParallelLoopResult For (int from, int to, Action<int> action)
85                 {
86                         return For (from, to, null, action);
87                 }
88                 
89                 public static ParallelLoopResult For (int from, int to, Action<int, ParallelLoopState> action)
90                 {
91                         return For (from, to, null, action);
92                 }
93                 
94                 public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int> action)
95                 {
96                         return For (from, to, options, (index, state) => action (index));
97                 }
98                 
99                 public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int, ParallelLoopState> action)
100                 {
101                         return For<object> (from, to, options, null, (i, s, l) => { action (i, s); return null; }, null);
102                 }
103                 
104                 public static ParallelLoopResult For<TLocal> (int from, int to, Func<TLocal> init,
105                                                               Func<int, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
106                 {
107                         return For<TLocal> (from, to, null, init, action, destruct);
108                 }
109                 
110                 [MonoTODO]
111                 public static ParallelLoopResult For<TLocal> (int from, int to, ParallelOptions options, 
112                                                               Func<TLocal> init, 
113                                                               Func<int, ParallelLoopState, TLocal, TLocal> action,
114                                                               Action<TLocal> destruct)
115                 {                       
116                         if (action == null)
117                                 throw new ArgumentNullException ("action");
118                         
119                         // Number of task to be launched (normally == Env.ProcessorCount)
120                         int num = Math.Min (GetBestWorkerNumber (), 
121                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
122                         // Integer range that each task process
123                         int step = Math.Min (5, (to - from) / num);
124                         if (step <= 0)
125                                 step = 1;
126                         
127                         throw new NotImplementedException ();
128 /*
129                         // Each worker put the indexes it's responsible for here
130                         // so that other worker may steal if they starve.
131                         ConcurrentBag<int> bag = new ConcurrentBag<int> ();
132                         Task[] tasks = new Task [num];
133                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
134                         
135                         int currentIndex = from;
136                 
137                         Action workerMethod = delegate {
138                                 int index, actual;
139                                 TLocal local = (init == null) ? default (TLocal) : init ();
140                                 
141                                 ParallelLoopState state = new ParallelLoopState (tasks, infos);
142                                 
143                                 try {
144                                         while ((index = Interlocked.Add (ref currentIndex, step) - step) < to) {
145                                                 if (infos.IsStopped.Value)
146                                                         return;
147                                                 
148                                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
149                                                         state.Stop ();
150                                                         return;
151                                                 }
152                                                 
153                                                 for (int i = index; i < to && i < index + step; i++)
154                                                         bag.Add (i);
155                                                 
156                                                 for (int i = index; i < to && i < index + step && bag.TryTake (out actual); i++) {
157                                                         if (infos.IsStopped.Value)
158                                                                 return;
159                                                         
160                                                         if (options != null && options.CancellationToken.IsCancellationRequested) {
161                                                                 state.Stop ();
162                                                                 return;
163                                                         }
164                                                         
165                                                         if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > actual)
166                                                                 return;
167                                                         
168                                                         state.CurrentIteration = actual;
169                                                         local = action (actual, state, local);
170                                                 }
171                                         }
172                                         
173                                         while (bag.TryTake (out actual)) {
174                                                 if (infos.IsStopped.Value)
175                                                         return;
176                                                 
177                                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
178                                                         state.Stop ();
179                                                         return;
180                                                 }
181                                                 
182                                                 if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > actual)
183                                                         continue;
184                                                 
185                                                 state.CurrentIteration = actual;
186                                                 local = action (actual, state, local);
187                                         }
188                                 } finally {
189                                         if (destruct != null)
190                                                 destruct (local);
191                                 }
192                         };
193                 
194                         if (options != null && options.TaskScheduler != null)
195                                 InitTasks (tasks, workerMethod, num, options.TaskScheduler);
196                         else
197                                 InitTasks (tasks, workerMethod, num);
198                         
199                         try {
200                                 Task.WaitAll (tasks);
201                         } catch {
202                                 HandleExceptions (tasks, infos);
203                         }
204                         
205                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
206 */                      
207                 }
208
209                 #endregion
210                 
211                 #region Foreach
212                 [MonoTODO]
213                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
214                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
215                                                                     Action<TLocal> destruct)
216                 {               
217                         int num = Math.Min (GetBestWorkerNumber (),
218                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
219                         
220                         Task[] tasks = new Task[num];
221                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
222                         
223                         throw new NotImplementedException ();
224 /*                      
225                         ConcurrentBag<TSource> bag = new ConcurrentBag<TSource> ();
226                         const int bagCount = 5;
227                         
228                         IList<IEnumerator<TSource>> slices = enumerable (num);
229                         int sliceIndex = 0;
230
231                         Action workerMethod = delegate {
232                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex) - 1];
233                                 
234                                 TLocal local = (init != null) ? init () : default (TLocal);
235                                 ParallelLoopState state = new ParallelLoopState (tasks, infos);
236                                 
237                                 try {
238                                         bool cont = true;
239                                         TSource element;
240                                         
241                                         while (cont) {
242                                                 if (infos.IsStopped.Value)
243                                                         return;
244                                                 
245                                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
246                                                         state.Stop ();
247                                                         return;
248                                                 }
249                                                 
250                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
251                                                         bag.Add (slice.Current);
252                                                 }
253                                                 
254                                                 for (int i = 0; i < bagCount && bag.TryTake (out element); i++) {
255                                                         if (infos.IsStopped.Value)
256                                                                 return;
257                                                         
258                                                         if (options != null && options.CancellationToken.IsCancellationRequested) {
259                                                                 state.Stop ();
260                                                                 return;
261                                                         }
262                                                         
263                                                         local = action (element, state, local);
264                                                 }
265                                         }
266                                         
267                                         while (bag.TryTake (out element)) {
268                                                 if (infos.IsStopped.Value)
269                                                         return;
270                                                 
271                                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
272                                                         state.Stop ();
273                                                         return;
274                                                 }
275                                                 
276                                                 local = action (element, state, local);
277                                         }
278                                 } finally {
279                                         if (destruct != null)
280                                                 destruct (local);
281                                 }
282                         };
283                         
284                         if (options != null && options.TaskScheduler != null)
285                                 InitTasks (tasks, workerMethod, num, options.TaskScheduler);
286                         else
287                                 InitTasks (tasks, workerMethod, num);
288                         
289                         try {
290                                 Task.WaitAll (tasks);
291                         } catch {
292                                 HandleExceptions (tasks, infos);
293                         }
294                         
295                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
296                         */
297                 }
298                 
299                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource> action)
300                 {
301                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null, 
302                                                          (e, s, l) => { action (e); return null; }, null);
303                 }
304                 
305                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource, ParallelLoopState> action)
306                 {
307                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
308                                                          (e, s, l) => { action (e, s); return null; }, null);
309                 }
310                 
311                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable,
312                                                                    Action<TSource, ParallelLoopState, long> action)
313                 {
314                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
315                                                          (e, s, l) => { action (e, s, -1); return null; }, null);
316                 }
317                 
318                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
319                                                                    Action<TSource, ParallelLoopState> body)
320                 {
321                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e, s); return null; }, null);
322                 }
323                 
324                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, 
325                                                                    Action<TSource, ParallelLoopState, long> body)
326
327                 {
328                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
329                 }
330                 
331                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
332                                                                    Action<TSource> body)
333
334                 {
335                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e); return null; }, null);
336                 }
337                 
338                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
339                                                                    Action<TSource> body)
340                 {
341                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null,
342                                                          (e, s, l) => { body (e); return null; }, null);
343                 }
344                 
345                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
346                                                                    Action<TSource, ParallelLoopState> body)
347                 {
348                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null, 
349                                                          (e, s, l) => { body (e, s); return null; }, null);
350                 }
351                 
352                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
353                                                                    Action<TSource, ParallelLoopState, long> body)
354                 {
355                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions,
356                                                          null, (e, s, i, l) => { body (e, s, i); return null; }, null);
357                 }
358                 
359                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
360                                                                    Action<TSource, ParallelLoopState, long> body)
361
362                 {
363                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
364                 }
365                 
366                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
367                                                                    Action<TSource> body)
368                 {
369                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => {body (e); return null; }, null);
370                 }
371                 
372                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions, 
373                                                                    Action<TSource, ParallelLoopState> body)
374                 {
375                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => { body (e, s); return null; }, null);
376                 }
377                 
378                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
379                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
380                                                                            Action<TLocal> localFinally)
381                 {
382                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source), null, localInit, body, localFinally);
383                 }
384                 
385                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
386                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
387                                                                            Action<TLocal> localFinally)
388                 {
389                         return ForEach<TSource, TLocal> (Partitioner.Create (source), null, localInit, body, localFinally);
390                 }
391                 
392                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
393                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
394                                                                            Action<TLocal> localFinally)
395                 {
396                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
397                 }
398                 
399                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
400                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
401                                                                            Action<TLocal> localFinally)
402                 {
403                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
404                 }
405                 
406                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
407                                                                            Func<TLocal> localInit,
408                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
409                                                                            Action<TLocal> localFinally)
410                 {
411                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
412                 }
413                 
414                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
415                                                                            Func<TLocal> localInit, 
416                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
417                                                                            Action<TLocal> localFinally)
418                 {
419                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
420                 }
421                 
422                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> enumerable, ParallelOptions options,
423                                                                            Func<TLocal> init,
424                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> action,
425                                                                            Action<TLocal> destruct)
426                 {
427                         return ForEach<TSource, TLocal> (enumerable.GetPartitions, options, init, action, destruct);
428                 }
429                         
430                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> enumerable, ParallelOptions options,
431                                                                            Func<TLocal> init,
432                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> action,
433                                                                            Action<TLocal> destruct)
434                 {
435                         return ForEach<KeyValuePair<long, TSource>, TLocal> (enumerable.GetOrderablePartitions, options,
436                                                                             init, (e, s, l) => action (e.Value, s, e.Key, l), destruct);
437                 }
438                 #endregion
439                 
440                 /* Disabled as this is an API addition
441                 #region While           
442                 public static void While (Func<bool> predicate, Action body)
443                 {
444                         if (body == null)
445                                 throw new ArgumentNullException ("body");
446                         if (predicate == null)
447                                 throw new ArgumentNullException ("predicate");
448                         
449                         int num = GetBestWorkerNumber ();
450                         
451                         Task[] tasks = new Task [num];
452                         
453                         Action action = delegate {
454                                 while (predicate ())
455                                 body ();
456                         };
457                         
458                         InitTasks (tasks, action, num);
459                         Task.WaitAll (tasks);
460                         HandleExceptions (tasks);
461                 }
462                 
463                 #endregion
464                 */
465
466                 #region Invoke
467                 public static void Invoke (params Action[] actions)
468                 {
469                         if (actions == null)
470                                 throw new ArgumentNullException ("actions");
471                         
472                         Invoke (actions, (Action a) => Task.Factory.StartNew (a));
473                 }
474                 
475                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
476                 {
477                         if (parallelOptions == null)
478                                 throw new ArgumentNullException ("parallelOptions");
479                         if (actions == null)
480                                 throw new ArgumentNullException ("actions");
481                         
482                         Invoke (actions, (Action a) => Task.Factory.StartNew (a, TaskCreationOptions.None, parallelOptions.TaskScheduler));
483                 }
484                 
485                 static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
486                 {
487                         if (actions.Length == 0)
488                                 throw new ArgumentException ("actions is empty");
489                         
490                         // Execute it directly
491                         if (actions.Length == 1 && actions[0] != null)
492                                 actions[0] ();
493                         
494                         bool shouldThrow = false;
495                         Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
496                                 if (a == null) {
497                                         shouldThrow = true;
498                                         return null;
499                                 }
500                                 
501                                 return taskCreator (a);
502                         });
503                         
504                         if (shouldThrow)
505                                 throw new ArgumentException ("One action in actions is null", "actions");
506                         
507                         try {
508                                 Task.WaitAll (ts);
509                         } catch {
510                                 HandleExceptions (ts);
511                         }
512                 }
513                 #endregion
514
515                 #region SpawnBestNumber, used by PLinq
516                 internal static Task[] SpawnBestNumber (Action action, Action callback)
517                 {
518                         return SpawnBestNumber (action, -1, callback);
519                 }
520                 
521                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
522                 {
523                         return SpawnBestNumber (action, dop, false, callback);
524                 }
525                 
526                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
527                 {
528                         // Get the optimum amount of worker to create
529                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
530                         
531                         // Initialize worker
532                         CountdownEvent evt = new CountdownEvent (num);
533                         Task[] tasks = new Task [num];
534                         for (int i = 0; i < num; i++) {
535                                 tasks [i] = Task.Factory.StartNew (() => { 
536                                         action ();
537                                         evt.Signal ();
538                                         if (callback != null && evt.IsSet)
539                                                 callback ();
540                                 });
541                         }
542
543                         // If explicitely told, wait for all workers to complete 
544                         // and thus let main thread participate in the processing
545                         if (wait)
546                                 Task.WaitAll (tasks);
547                         
548                         return tasks;
549                 }
550                 #endregion
551         }
552 }
553 #endif