copied mono-api-diff.cs from mono-2-2 branch so new patch can be applied and history...
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Parallel.cs
1 #if NET_4_0
2 // Parallel.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 using System;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 using System.Threading;
30
31 namespace System.Threading.Tasks
32 {
33         public static class Parallel
34         {
35                 internal static int GetBestWorkerNumber ()
36                 {
37                         return GetBestWorkerNumber (TaskScheduler.Current);
38                 }
39                 
40                 internal static int GetBestWorkerNumber (TaskScheduler scheduler)
41                 {       
42                         return scheduler.MaximumConcurrencyLevel;
43                 }
44                 
45                 static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
46                 {
47                         int num = Math.Min (GetBestWorkerNumber (),
48                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
49                         // Integer range that each task process
50                         step = Math.Min (5, (to - from) / num);
51                         if (step <= 0)
52                                 step = 1;
53                         
54                         return num;
55                 }
56                 
57                 static void HandleExceptions (IEnumerable<Task> tasks)
58                 {
59                         HandleExceptions (tasks, null);
60                 }
61                 
62                 static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
63                 {
64                         List<Exception> exs = new List<Exception> ();
65                         foreach (Task t in tasks) {
66                                 if (t.Exception != null)
67                                         exs.Add (t.Exception);
68                         }
69                         
70                         if (exs.Count > 0) {
71                                 if (infos != null)
72                                         infos.IsExceptional = true;
73                                 
74                                 throw new AggregateException (exs);
75                         }
76                 }
77                 
78                 static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
79                 {
80                         TaskCreationOptions creation = TaskCreationOptions.LongRunning;
81                         
82                         for (int i = 0; i < count; i++) {
83                                 if (options == null)
84                                         tasks [i] = Task.Factory.StartNew (action, creation);
85                                 else
86                                         tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
87                         }
88                 }
89                 #region For
90                 
91                 public static ParallelLoopResult For (int from, int to, Action<int> action)
92                 {
93                         return For (from, to, null, action);
94                 }
95                 
96                 public static ParallelLoopResult For (int from, int to, Action<int, ParallelLoopState> action)
97                 {
98                         return For (from, to, null, action);
99                 }
100                 
101                 public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int> action)
102                 {
103                         return For (from, to, options, (index, state) => action (index));
104                 }
105                 
106                 public static ParallelLoopResult For (int from, int to, ParallelOptions options, Action<int, ParallelLoopState> action)
107                 {
108                         return For<object> (from, to, options, null, (i, s, l) => { action (i, s); return null; }, null);
109                 }
110                 
111                 public static ParallelLoopResult For<TLocal> (int from, int to, Func<TLocal> init,
112                                                               Func<int, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
113                 {
114                         return For<TLocal> (from, to, null, init, action, destruct);
115                 }
116         
117                 
118                 public static ParallelLoopResult For<TLocal> (int from, int to, ParallelOptions options, 
119                                                               Func<TLocal> init, 
120                                                               Func<int, ParallelLoopState, TLocal, TLocal> action,
121                                                               Action<TLocal> destruct)
122                 {                       
123                         if (action == null)
124                                 throw new ArgumentNullException ("action");
125                         
126                         // Number of task to be launched (normally == Env.ProcessorCount)
127                         int step;
128                         int num = GetBestWorkerNumber (from, to, options, out step);
129
130                         // Each worker put the indexes it's responsible for here
131                         // so that other worker may steal if they starve.
132                         SimpleConcurrentBag<int> bag = new SimpleConcurrentBag<int> (num);
133                         Task[] tasks = new Task [num];
134                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
135                         
136                         Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
137                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
138                                         s.Stop ();
139                                         return true;
140                                 }
141                                 return false;
142                         };
143                         
144                         Func<int, bool> breakTest = (i) => infos.LowestBreakIteration != null && infos.LowestBreakIteration > i;
145                         
146                         int currentIndex = from;
147                         
148                         Action workerMethod = delegate {
149                                 int index, actual;
150                                 TLocal local = (init == null) ? default (TLocal) : init ();
151                                 
152                                 ParallelLoopState state = new ParallelLoopState (infos);
153                                 int workIndex = bag.GetNextIndex ();
154                                 
155                                 try {
156                                         while (currentIndex < to && (index = Interlocked.Add (ref currentIndex, step) - step) < to) {
157                                                 if (infos.IsStopped.Value)
158                                                         return;
159                                                 
160                                                 if (cancellationTokenTest (state))
161                                                         return;
162                                                 
163                                                 for (int i = index; i < to && i < index + step; i++)
164                                                         bag.Add (workIndex, i);
165                                                 
166                                                 for (int i = index; i < to && i < index + step && bag.TryTake (workIndex, out actual); i++) {
167                                                         if (infos.IsStopped.Value)
168                                                                 return;
169                                                         
170                                                         if (cancellationTokenTest (state))
171                                                                 return;
172                                                         
173                                                         if (breakTest (actual))
174                                                                 return;
175                                                         
176                                                         state.CurrentIteration = actual;
177                                                         local = action (actual, state, local);
178                                                 }
179                                         }
180                                         
181                                         while (bag.TrySteal (workIndex, out actual)) {
182                                                 if (infos.IsStopped.Value)
183                                                         return;
184                                                 
185                                                 if (cancellationTokenTest (state))
186                                                         return;
187                                                 
188                                                 if (breakTest (actual))
189                                                         continue;
190                                                 
191                                                 state.CurrentIteration = actual;
192                                                 local = action (actual, state, local);
193                                         }
194                                 } finally {
195                                         if (destruct != null)
196                                                 destruct (local);
197                                 }
198                         };
199
200                         InitTasks (tasks, num, workerMethod, options);
201                         
202                         try {
203                                 Task.WaitAll (tasks);
204                         } catch {
205                                 HandleExceptions (tasks, infos);
206                         }
207                         
208                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));    
209                 }
210
211                 #endregion
212                 
213                 #region Foreach
214                 static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
215                                                                     Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
216                                                                     Action<TLocal> destruct)
217                 {               
218                         int num = Math.Min (GetBestWorkerNumber (),
219                                             options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
220                         
221                         Task[] tasks = new Task[num];
222                         ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
223                         
224                         SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
225                         const int bagCount = 5;
226                         
227                         IList<IEnumerator<TSource>> slices = enumerable (num);
228                         
229                         int sliceIndex = 0;
230                         
231                         Func<ParallelLoopState, bool> cancellationTokenTest = (s) => {
232                                 if (options != null && options.CancellationToken.IsCancellationRequested) {
233                                         s.Stop ();
234                                         return true;
235                                 }
236                                 return false;
237                         };
238
239                         Action workerMethod = delegate {
240                                 IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex) - 1];
241                                 
242                                 TLocal local = (init != null) ? init () : default (TLocal);
243                                 ParallelLoopState state = new ParallelLoopState (infos);
244                                 int workIndex = bag.GetNextIndex ();
245                                 
246                                 try {
247                                         bool cont = true;
248                                         TSource element;
249                                         
250                                         while (cont) {
251                                                 if (infos.IsStopped.Value)
252                                                         return;
253                                                 
254                                                 if (cancellationTokenTest (state))
255                                                         return;
256                                                 
257                                                 for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
258                                                         bag.Add (workIndex, slice.Current);
259                                                 }
260                                                 
261                                                 for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
262                                                         if (infos.IsStopped.Value)
263                                                                 return;
264                                                         
265                                                         if (cancellationTokenTest (state))
266                                                                 return;
267                                                         
268                                                         local = action (element, state, local);
269                                                 }
270                                         }
271                                         
272                                         while (bag.TrySteal (workIndex, out element)) {
273                                                 if (infos.IsStopped.Value)
274                                                         return;
275                                                 
276                                                 if (cancellationTokenTest (state))
277                                                         return;
278                                                 
279                                                 local = action (element, state, local);
280                                         }
281                                 } finally {
282                                         if (destruct != null)
283                                                 destruct (local);
284                                 }
285                         };
286                         
287                         InitTasks (tasks, num, workerMethod, options);
288                         
289                         try {
290                                 Task.WaitAll (tasks);
291                         } catch {
292                                 HandleExceptions (tasks, infos);
293                         }
294                         
295                         return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped.Value || infos.IsExceptional));
296                         
297                 }
298                 
299                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource> action)
300                 {
301                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null, 
302                                                          (e, s, l) => { action (e); return null; }, null);
303                 }
304                 
305                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable, Action<TSource, ParallelLoopState> action)
306                 {
307                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
308                                                          (e, s, l) => { action (e, s); return null; }, null);
309                 }
310                 
311                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> enumerable,
312                                                                    Action<TSource, ParallelLoopState, long> action)
313                 {
314                         return ForEach<TSource, object> (Partitioner.Create (enumerable), ParallelOptions.Default, null,
315                                                          (e, s, l) => { action (e, s, -1); return null; }, null);
316                 }
317                 
318                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
319                                                                    Action<TSource, ParallelLoopState> body)
320                 {
321                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e, s); return null; }, null);
322                 }
323                 
324                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, 
325                                                                    Action<TSource, ParallelLoopState, long> body)
326
327                 {
328                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
329                 }
330                 
331                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
332                                                                    Action<TSource> body)
333
334                 {
335                         return ForEach<TSource, object> (source, ParallelOptions.Default, null, (e, s, l) => { body (e); return null; }, null);
336                 }
337                 
338                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
339                                                                    Action<TSource> body)
340                 {
341                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null,
342                                                          (e, s, l) => { body (e); return null; }, null);
343                 }
344                 
345                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
346                                                                    Action<TSource, ParallelLoopState> body)
347                 {
348                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions, null, 
349                                                          (e, s, l) => { body (e, s); return null; }, null);
350                 }
351                 
352                 public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
353                                                                    Action<TSource, ParallelLoopState, long> body)
354                 {
355                         return ForEach<TSource, object> (Partitioner.Create (source), parallelOptions,
356                                                          null, (e, s, i, l) => { body (e, s, i); return null; }, null);
357                 }
358                 
359                 public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
360                                                                    Action<TSource, ParallelLoopState, long> body)
361
362                 {
363                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, i, l) => { body (e, s, i); return null; }, null);
364                 }
365                 
366                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
367                                                                    Action<TSource> body)
368                 {
369                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => {body (e); return null; }, null);
370                 }
371                 
372                 public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions, 
373                                                                    Action<TSource, ParallelLoopState> body)
374                 {
375                         return ForEach<TSource, object> (source, parallelOptions, null, (e, s, l) => { body (e, s); return null; }, null);
376                 }
377                 
378                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
379                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
380                                                                            Action<TLocal> localFinally)
381                 {
382                         return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source), null, localInit, body, localFinally);
383                 }
384                 
385                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
386                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
387                                                                            Action<TLocal> localFinally)
388                 {
389                         return ForEach<TSource, TLocal> (Partitioner.Create (source), null, localInit, body, localFinally);
390                 }
391                 
392                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
393                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
394                                                                            Action<TLocal> localFinally)
395                 {
396                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
397                 }
398                 
399                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
400                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
401                                                                            Action<TLocal> localFinally)
402                 {
403                         return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
404                 }
405                 
406                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
407                                                                            Func<TLocal> localInit,
408                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> body,
409                                                                            Action<TLocal> localFinally)
410                 {
411                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
412                 }
413                 
414                 public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
415                                                                            Func<TLocal> localInit, 
416                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
417                                                                            Action<TLocal> localFinally)
418                 {
419                         return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
420                 }
421                 
422                 public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> enumerable, ParallelOptions options,
423                                                                            Func<TLocal> init,
424                                                                            Func<TSource, ParallelLoopState, TLocal, TLocal> action,
425                                                                            Action<TLocal> destruct)
426                 {
427                         return ForEach<TSource, TLocal> (enumerable.GetPartitions, options, init, action, destruct);
428                 }
429                         
430                 public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> enumerable, ParallelOptions options,
431                                                                            Func<TLocal> init,
432                                                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> action,
433                                                                            Action<TLocal> destruct)
434                 {
435                         return ForEach<KeyValuePair<long, TSource>, TLocal> (enumerable.GetOrderablePartitions, options,
436                                                                             init, (e, s, l) => action (e.Value, s, e.Key, l), destruct);
437                 }
438                 #endregion
439
440                 #region Invoke
441                 public static void Invoke (params Action[] actions)
442                 {
443                         if (actions == null)
444                                 throw new ArgumentNullException ("actions");
445                         
446                         Invoke (actions, (Action a) => Task.Factory.StartNew (a));
447                 }
448                 
449                 public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
450                 {
451                         if (parallelOptions == null)
452                                 throw new ArgumentNullException ("parallelOptions");
453                         if (actions == null)
454                                 throw new ArgumentNullException ("actions");
455                         
456                         Invoke (actions, (Action a) => Task.Factory.StartNew (a, CancellationToken.None, TaskCreationOptions.None, parallelOptions.TaskScheduler));
457                 }
458                 
459                 static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
460                 {
461                         if (actions.Length == 0)
462                                 throw new ArgumentException ("actions is empty");
463                         
464                         // Execute it directly
465                         if (actions.Length == 1 && actions[0] != null)
466                                 actions[0] ();
467                         
468                         bool shouldThrow = false;
469                         Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
470                                 if (a == null) {
471                                         shouldThrow = true;
472                                         return null;
473                                 }
474                                 
475                                 return taskCreator (a);
476                         });
477                         
478                         if (shouldThrow)
479                                 throw new ArgumentException ("One action in actions is null", "actions");
480                         
481                         try {
482                                 Task.WaitAll (ts);
483                         } catch {
484                                 HandleExceptions (ts);
485                         }
486                 }
487                 #endregion
488
489                 #region SpawnBestNumber, used by PLinq
490                 internal static Task[] SpawnBestNumber (Action action, Action callback)
491                 {
492                         return SpawnBestNumber (action, -1, callback);
493                 }
494                 
495                 internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
496                 {
497                         return SpawnBestNumber (action, dop, false, callback);
498                 }
499                 
500                 internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
501                 {
502                         // Get the optimum amount of worker to create
503                         int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
504                         
505                         // Initialize worker
506                         CountdownEvent evt = new CountdownEvent (num);
507                         Task[] tasks = new Task [num];
508                         for (int i = 0; i < num; i++) {
509                                 tasks [i] = Task.Factory.StartNew (() => { 
510                                         action ();
511                                         evt.Signal ();
512                                         if (callback != null && evt.IsSet)
513                                                 callback ();
514                                 });
515                         }
516
517                         // If explicitely told, wait for all workers to complete 
518                         // and thus let main thread participate in the processing
519                         if (wait)
520                                 Task.WaitAll (tasks);
521                         
522                         return tasks;
523                 }
524                 #endregion
525         }
526 }
527 #endif