5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Collections;
32 using System.Collections.Generic;
33 using System.Collections.Concurrent;
34 using System.Linq.Parallel.QueryNodes;
36 namespace System.Linq.Parallel
38 internal static class ParallelExecuter
40 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode)
42 return CheckQuery<T> (startingNode, false);
45 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, bool blocking)
47 return CheckQuery (startingNode, GetBestWorkerNumber (blocking));
50 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, int partitionCount)
52 QueryCheckerVisitor visitor = new QueryCheckerVisitor (partitionCount);
53 startingNode.Visit (visitor);
55 return visitor.Options;
58 internal static CancellationToken Chain (this CancellationToken self, CancellationTokenSource other)
60 CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource (self, other.Token);
64 internal static bool IsOrdered<TSource> (this QueryBaseNode<TSource> source)
66 QueryIsOrderedVisitor visitor = new QueryIsOrderedVisitor ();
67 source.Visit (visitor);
69 return visitor.BehindOrderGuard;
72 internal static int GetBestWorkerNumber ()
74 return GetBestWorkerNumber (false);
77 internal static int GetBestWorkerNumber (bool blocking)
79 return blocking && Task.CurrentId == null ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
82 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
83 Action<TElement, CancellationToken> call,
84 Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
87 return Process<TSource, TElement> (node, call, acquisitionFunc, null, options);
90 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
91 Action<TElement, CancellationToken> call,
92 Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
96 CancellationTokenSource src
97 = CancellationTokenSource.CreateLinkedTokenSource (options.ImplementerToken, options.Token);
99 IList<IEnumerable<TElement>> enumerables = acquisitionFunc (node, options);
101 Task[] tasks = new Task[enumerables.Count];
103 for (int i = 0; i < tasks.Length; i++) {
105 tasks[i] = Task.Factory.StartNew (() => {
107 foreach (TElement item in enumerables[index]) {
108 if (!CheckTokens (options))
112 call (item, src.Token);
113 } catch (OperationCanceledException canceledException) {
114 if (canceledException.CancellationToken != src.Token)
115 throw canceledException;
118 if (!CheckTokens (options))
122 if (endAction != null)
125 }, options.Token, TaskCreationOptions.AttachedToParent | TaskCreationOptions.LongRunning, TaskScheduler.Default);
131 static bool CheckTokens (QueryOptions options)
133 // This is from specific operators
134 if (options.ImplementerToken.IsCancellationRequested)
136 if (options.Token.IsCancellationRequested)
137 throw new OperationCanceledException (options.Token);
141 internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call)
143 QueryOptions options = CheckQuery (node, true);
145 Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
146 Task.WaitAll (tasks, options.Token);
149 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call,
150 Action callback, QueryOptions options)
152 Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
153 if (callback != null)
154 Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
156 return () => Task.WaitAll (tasks, options.Token);
159 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, CancellationToken> call,
161 Action callback, QueryOptions options)
163 Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetOrderedEnumerables, endAction, options);
164 if (callback != null)
165 Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
167 return () => Task.WaitAll (tasks, options.Token);
170 internal static void ProcessAndAggregate<T, U> (QueryBaseNode<T> node,
172 Func<U, T, U> localCall,
173 Action<IList<U>> call)
175 QueryOptions options = CheckQuery (node, true);
177 IList<IEnumerable<T>> enumerables = node.GetEnumerables (options);
178 U[] locals = new U[enumerables.Count];
179 Task[] tasks = new Task[enumerables.Count];
181 if (seedFunc != null) {
182 for (int i = 0; i < locals.Length; i++)
183 locals[i] = seedFunc ();
186 for (int i = 0; i < tasks.Length; i++) {
187 var procSlot = new AggregateProcessSlot<T, U> (options,
189 enumerables[i].GetEnumerator (),
194 tasks[i] = Task.Factory.StartNew (procSlot.Process, options.Token);
197 Task.WaitAll (tasks, options.Token);
203 class AggregateProcessSlot<T, U>
205 readonly QueryOptions options;
207 readonly IEnumerator<T> enumerator;
209 readonly Func<U, T, U> localCall;
210 readonly Func<U> seedFunc;
212 public AggregateProcessSlot (QueryOptions options,
214 IEnumerator<T> enumerator,
216 Func<U, T, U> localCall,
219 this.options = options;
221 this.enumerator = enumerator;
222 this.locals = locals;
223 this.localCall = localCall;
224 this.seedFunc = seedFunc;
227 public void Process ()
229 var token = options.Token;
230 var implementerToken = options.ImplementerToken;
233 if (seedFunc == null) {
234 if (!enumerator.MoveNext ())
236 locals[index] = (U)(object)enumerator.Current;
239 while (enumerator.MoveNext ()) {
240 if (implementerToken.IsCancellationRequested)
242 token.ThrowIfCancellationRequested ();
243 locals[index] = localCall (locals[index], enumerator.Current);
246 enumerator.Dispose ();
251 class QueryBaseNodeHelper<T>
253 internal IList<IEnumerable<T>> GetEnumerables (QueryBaseNode<T> source, QueryOptions options)
255 return source.GetEnumerables (options);
258 internal IList<IEnumerable<KeyValuePair<long,T>>> GetOrderedEnumerables (QueryBaseNode<T> source, QueryOptions options)
260 return source.GetOrderedEnumerables (options);