5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Collections;
32 using System.Collections.Generic;
33 using System.Collections.Concurrent;
37 internal static class ParallelExecuter
39 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode)
41 return CheckQuery<T> (startingNode, false);
44 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, bool blocking)
46 return CheckQuery (startingNode, GetBestWorkerNumber (blocking));
49 internal static QueryOptions CheckQuery<T> (QueryBaseNode<T> startingNode, int partitionCount)
51 QueryCheckerVisitor visitor = new QueryCheckerVisitor (partitionCount);
52 startingNode.Visit (visitor);
54 return visitor.Options;
57 internal static CancellationToken Chain (this CancellationToken self, CancellationTokenSource other)
59 CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource (self, other.Token);
63 // TODO: replace the CheckQuery call with a custom visitor that stops after the
64 // first encountered order guard
65 internal static bool IsOrdered<TSource> (this QueryBaseNode<TSource> source)
67 return CheckQuery (source).BehindOrderGuard.Value;
70 internal static int GetBestWorkerNumber ()
72 return GetBestWorkerNumber (false);
75 internal static int GetBestWorkerNumber (bool blocking)
77 return blocking ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
80 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
81 Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
84 return Process<TSource, TElement> (node, call, acquisitionFunc, null, options);
87 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
88 Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
92 return Process<TSource, TElement> (node,
95 endAction == null ? ((Action<int>)null) : (i) => endAction (),
99 internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement, int> call,
100 Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
101 Action<int> endAction,
102 QueryOptions options)
104 IList<IEnumerable<TElement>> enumerables = acquisitionFunc (node, options);
106 Task[] tasks = new Task[enumerables.Count];
108 for (int i = 0; i < tasks.Length; i++) {
110 tasks[i] = Task.Factory.StartNew (() => {
111 foreach (TElement item in enumerables[index]) {
112 // This is from specific operators
113 if (options.ImplementerToken.IsCancellationRequested)
115 if (options.Token.IsCancellationRequested)
116 throw new OperationCanceledException (options.Token);
120 if (endAction != null)
128 internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T> call)
130 QueryOptions options = CheckQuery (node, true);
132 Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
133 Task.WaitAll (tasks, options.Token);
136 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T> call,
137 Action callback, QueryOptions options)
139 Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
140 if (callback != null)
141 Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
143 return () => Task.WaitAll (tasks, options.Token);
146 internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, int> call,
147 Action<int> endAction,
148 Action callback, QueryOptions options)
150 Task[] tasks = Process (node, call, (n, o) => n.GetOrderedEnumerables (o), endAction, options);
151 if (callback != null)
152 Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
154 return () => Task.WaitAll (tasks, options.Token);
157 internal static void ProcessAndAggregate<T, U> (QueryBaseNode<T> node,
159 Func<U, T, U> localCall,
160 Action<IList<U>> call)
162 QueryOptions options = CheckQuery (node, true);
164 IList<IEnumerable<T>> enumerables = node.GetEnumerables (options);
165 U[] locals = new U[enumerables.Count];
166 Task[] tasks = new Task[enumerables.Count];
168 if (seedFunc != null) {
169 for (int i = 0; i < locals.Length; i++)
170 locals[i] = seedFunc ();
173 for (int i = 0; i < tasks.Length; i++) {
175 bool firstRun = true;
177 tasks[i] = Task.Factory.StartNew (() => {
178 foreach (T item in enumerables[index]) {
179 // This is from specific operators
180 if (options.ImplementerToken.IsCancellationRequested)
182 if (options.Token.IsCancellationRequested)
183 throw new OperationCanceledException (options.Token);
185 if (firstRun && seedFunc == null) {
187 // HACK: TODO: omgwtfitsuckssomuch
188 locals[index] = (U)(object)item;
192 U acc = locals[index];
193 locals[index] = localCall (acc, item);
198 Task.WaitAll (tasks, options.Token);