[PLinq] Move NET_4_0 define check under licence text
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / ParallelExecuter.cs
1 //
2 // ParallelExecuter.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0
28 using System;
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Collections;
32 using System.Collections.Generic;
33 using System.Collections.Concurrent;
34
35 namespace System.Linq
36 {
37         internal static class ParallelExecuter
38         {
39                 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode)
40                 {
41                         return CheckQuery<T> (startingNode, false);
42                 }
43
44                 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, bool blocking)
45                 {
46                         return CheckQuery (startingNode, GetBestWorkerNumber (blocking));
47                 }
48
49                 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, int partitionCount)
50                 {
51                         QueryCheckerVisitor visitor = new QueryCheckerVisitor (partitionCount);
52                         startingNode.Visit (visitor);
53
54                         return visitor.Options;
55                 }
56
57                 internal static CancellationToken Chain (this CancellationToken self, CancellationTokenSource other)
58                 {
59                         CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource (self, other.Token);
60                         return linked.Token;
61                 }
62
63                 // TODO: replace the CheckQuery call with a custom visitor that stops after the
64                 // first encountered order guard
65                 internal static bool IsOrdered<TSource> (this QueryBaseNode<TSource> source)
66                 {
67                         return CheckQuery (source).BehindOrderGuard.Value;
68                 }
69
70                 internal static int GetBestWorkerNumber ()
71                 {
72                         return GetBestWorkerNumber (false);
73                 }
74
75                 internal static int GetBestWorkerNumber (bool blocking)
76                 {
77                         return blocking ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
78                 }
79
80                 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
81                                                                    Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
82                                                                    QueryOptions options)
83                 {
84                         return Process<TSource, TElement> (node, call, acquisitionFunc, null, options);
85                 }
86                 
87                 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
88                                                                    Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
89                                                                    Action endAction,
90                                                                    QueryOptions options)
91                 {
92                         return Process<TSource, TElement> (node,
93                                                            (e, i) => call (e),
94                                                            acquisitionFunc,
95                                                            endAction == null ? ((Action<int>)null) : (i) => endAction (),
96                                                            options);
97                 }
98
99                 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement, int> call,
100                                                                    Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
101                                                                    Action<int> endAction,
102                                                                    QueryOptions options)
103                 {
104                         IList<IEnumerable<TElement>> enumerables = acquisitionFunc (node, options);
105
106                         Task[] tasks = new Task[enumerables.Count];
107                         
108                         for (int i = 0; i < tasks.Length; i++) {
109                                 int index = i;
110                                 tasks[i] = Task.Factory.StartNew (() => {
111                                         foreach (TElement item in enumerables[index]) {
112                                                 // This is from specific operators
113                                                 if (options.ImplementerToken.IsCancellationRequested)
114                                                         break;
115                                                 if (options.Token.IsCancellationRequested)
116                                                         throw new OperationCanceledException (options.Token);
117
118                                                 call (item, index);
119                                         }
120                                         if (endAction != null)
121                                                 endAction (index);
122                                   }, options.Token);
123                         }
124
125                         return tasks;
126                 }
127
128                 internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T> call)
129                 {
130                         QueryOptions options = CheckQuery (node, true);
131
132                         Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
133                         Task.WaitAll (tasks, options.Token);
134                 }
135
136                 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T> call,
137                                                               Action callback, QueryOptions options)
138                 {
139                         Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
140                         if (callback != null)
141                                 Task.Factory.ContinueWhenAll (tasks,  (_) => callback ());
142
143                         return () => Task.WaitAll (tasks, options.Token);
144                 }
145
146                 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, int> call,
147                                                               Action<int> endAction,
148                                                               Action callback, QueryOptions options)
149                 {
150                         Task[] tasks = Process (node, call, (n, o) => n.GetOrderedEnumerables (o), endAction, options);
151                         if (callback != null)
152                                 Task.Factory.ContinueWhenAll (tasks,  (_) => callback ());
153
154                         return () => Task.WaitAll (tasks, options.Token);
155                 }
156
157                 internal static void ProcessAndAggregate<T, U> (QueryBaseNode<T> node,
158                                                                 Func<U> seedFunc,
159                                                                 Func<U, T, U> localCall,
160                                                                 Action<IList<U>> call)
161                 {
162                         QueryOptions options = CheckQuery (node, true);
163
164                         IList<IEnumerable<T>> enumerables = node.GetEnumerables (options);
165                         U[] locals = new U[enumerables.Count];
166                         Task[] tasks = new Task[enumerables.Count];
167
168                         if (seedFunc != null) {
169                                 for (int i = 0; i < locals.Length; i++)
170                                         locals[i] = seedFunc ();
171                         }
172
173                         for (int i = 0; i < tasks.Length; i++) {
174                                 int index = i;
175                                 bool firstRun = true;
176
177                                 tasks[i] = Task.Factory.StartNew (() => {
178                                         foreach (T item in enumerables[index]) {
179                                                 // This is from specific operators
180                                                 if (options.ImplementerToken.IsCancellationRequested)
181                                                         break;
182                                                 if (options.Token.IsCancellationRequested)
183                                                         throw new OperationCanceledException (options.Token);
184
185                                                 if (firstRun && seedFunc == null) {
186                                                         firstRun = false;
187                                                         // HACK: TODO: omgwtfitsuckssomuch
188                                                         locals[index] = (U)(object)item;
189                                                         continue;
190                                                 }
191                                                 
192                                                 U acc = locals[index];
193                                                 locals[index] = localCall (acc, item);
194                                         }
195                                 }, options.Token);
196                         }
197
198                         Task.WaitAll (tasks, options.Token);
199
200                         if (call != null)
201                                 call (locals);
202                 }
203         }
204 }
205 #endif