return linked.Token;
}
- // TODO: replace the CheckQuery call with a custom visitor that stops after the
- // first encountered order guard
internal static bool IsOrdered<TSource> (this QueryBaseNode<TSource> source)
{
- return CheckQuery (source).BehindOrderGuard.Value;
+ QueryIsOrderedVisitor visitor = new QueryIsOrderedVisitor ();
+ source.Visit (visitor);
+
+ return visitor.BehindOrderGuard;
}
internal static int GetBestWorkerNumber ()
internal static int GetBestWorkerNumber (bool blocking)
{
- return blocking ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
+ return blocking && Task.CurrentId == null ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
}
- internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
+ internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
+ Action<TElement, CancellationToken> call,
Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
QueryOptions options)
{
return Process<TSource, TElement> (node, call, acquisitionFunc, null, options);
}
-
- internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
+
+ internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
+ Action<TElement, CancellationToken> call,
Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
Action endAction,
QueryOptions options)
{
- return Process<TSource, TElement> (node,
- (e, i) => call (e),
- acquisitionFunc,
- endAction == null ? ((Action<int>)null) : (i) => endAction (),
- options);
- }
+ CancellationTokenSource src
+ = CancellationTokenSource.CreateLinkedTokenSource (options.ImplementerToken, options.Token);
- internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement, int> call,
- Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
- Action<int> endAction,
- QueryOptions options)
- {
IList<IEnumerable<TElement>> enumerables = acquisitionFunc (node, options);
Task[] tasks = new Task[enumerables.Count];
-
+
for (int i = 0; i < tasks.Length; i++) {
int index = i;
tasks[i] = Task.Factory.StartNew (() => {
- foreach (TElement item in enumerables[index]) {
- // This is from specific operators
- if (options.ImplementerToken.IsCancellationRequested)
- break;
- if (options.Token.IsCancellationRequested)
- throw new OperationCanceledException (options.Token);
+ try {
+ foreach (TElement item in enumerables[index]) {
+ if (!CheckTokens (options))
+ break;
- call (item, index);
+ try {
+ call (item, src.Token);
+ } catch (OperationCanceledException canceledException) {
+ if (canceledException.CancellationToken != src.Token)
+ throw canceledException;
+ }
+
+ if (!CheckTokens (options))
+ break;
+ }
+ } finally {
+ if (endAction != null)
+ endAction ();
}
- if (endAction != null)
- endAction (index);
- }, options.Token);
+ }, options.Token, TaskCreationOptions.AttachedToParent | TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
return tasks;
}
- internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T> call)
+ static bool CheckTokens (QueryOptions options)
+ {
+ // This is from specific operators
+ if (options.ImplementerToken.IsCancellationRequested)
+ return false;
+ if (options.Token.IsCancellationRequested)
+ throw new OperationCanceledException (options.Token);
+ return true;
+ }
+
+ internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call)
{
QueryOptions options = CheckQuery (node, true);
- Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
+ Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
Task.WaitAll (tasks, options.Token);
}
- internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T> call,
+ internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call,
Action callback, QueryOptions options)
{
- Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
+ Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
if (callback != null)
Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
return () => Task.WaitAll (tasks, options.Token);
}
- internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, int> call,
- Action<int> endAction,
+ internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, CancellationToken> call,
+ Action endAction,
Action callback, QueryOptions options)
{
- Task[] tasks = Process (node, call, (n, o) => n.GetOrderedEnumerables (o), endAction, options);
+ Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetOrderedEnumerables, endAction, options);
if (callback != null)
Task.Factory.ContinueWhenAll (tasks, (_) => callback ());
}
for (int i = 0; i < tasks.Length; i++) {
- int index = i;
- bool firstRun = true;
+ var procSlot = new AggregateProcessSlot<T, U> (options,
+ i,
+ enumerables[i].GetEnumerator (),
+ locals,
+ localCall,
+ seedFunc);
- tasks[i] = Task.Factory.StartNew (() => {
- foreach (T item in enumerables[index]) {
- // This is from specific operators
- if (options.ImplementerToken.IsCancellationRequested)
- break;
- if (options.Token.IsCancellationRequested)
- throw new OperationCanceledException (options.Token);
-
- if (firstRun && seedFunc == null) {
- firstRun = false;
- // HACK: TODO: omgwtfitsuckssomuch
- locals[index] = (U)(object)item;
- continue;
- }
-
- U acc = locals[index];
- locals[index] = localCall (acc, item);
- }
- }, options.Token);
+ tasks[i] = Task.Factory.StartNew (procSlot.Process, options.Token);
}
Task.WaitAll (tasks, options.Token);
if (call != null)
call (locals);
}
+
+ class AggregateProcessSlot<T, U>
+ {
+ readonly QueryOptions options;
+ readonly int index;
+ readonly IEnumerator<T> enumerator;
+ readonly U[] locals;
+ readonly Func<U, T, U> localCall;
+ readonly Func<U> seedFunc;
+
+ public AggregateProcessSlot (QueryOptions options,
+ int index,
+ IEnumerator<T> enumerator,
+ U[] locals,
+ Func<U, T, U> localCall,
+ Func<U> seedFunc)
+ {
+ this.options = options;
+ this.index = index;
+ this.enumerator = enumerator;
+ this.locals = locals;
+ this.localCall = localCall;
+ this.seedFunc = seedFunc;
+ }
+
+ public void Process ()
+ {
+ var token = options.Token;
+ var implementerToken = options.ImplementerToken;
+
+ try {
+ // Avoid cache thrashing of locals array
+ var local = locals [index];
+
+ if (seedFunc == null) {
+ if (!enumerator.MoveNext ())
+ return;
+ local = (U)(object)enumerator.Current;
+ }
+
+ while (enumerator.MoveNext ()) {
+ if (implementerToken.IsCancellationRequested)
+ break;
+ token.ThrowIfCancellationRequested ();
+ local = localCall (local, enumerator.Current);
+ }
+
+ locals [index] = local;
+ } finally {
+ enumerator.Dispose ();
+ }
+ }
+ }
+
+ class QueryBaseNodeHelper<T>
+ {
+ internal IList<IEnumerable<T>> GetEnumerables (QueryBaseNode<T> source, QueryOptions options)
+ {
+ return source.GetEnumerables (options);
+ }
+
+ internal IList<IEnumerable<KeyValuePair<long,T>>> GetOrderedEnumerables (QueryBaseNode<T> source, QueryOptions options)
+ {
+ return source.GetOrderedEnumerables (options);
+ }
+ }
}
}
#endif
\ No newline at end of file