Merge pull request #1304 from slluis/mac-proxy-autoconfig
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / ParallelExecuter.cs
index 920b067a559fbe985623a7588572dc3ed75536ca..d1d639b9a140e6826d5d19c4590e443dbf89609c 100644 (file)
@@ -61,11 +61,12 @@ namespace System.Linq.Parallel
                        return linked.Token;
                }
 
-               // TODO: replace the CheckQuery call with a custom visitor that stops after the
-               // first encountered order guard
                internal static bool IsOrdered<TSource> (this QueryBaseNode<TSource> source)
                {
-                       return CheckQuery (source).BehindOrderGuard.Value;
+                       QueryIsOrderedVisitor visitor = new QueryIsOrderedVisitor ();
+                       source.Visit (visitor);
+
+                       return visitor.BehindOrderGuard;
                }
 
                internal static int GetBestWorkerNumber ()
@@ -75,80 +76,91 @@ namespace System.Linq.Parallel
 
                internal static int GetBestWorkerNumber (bool blocking)
                {
-                       return blocking ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
+                       return blocking && Task.CurrentId == null ? Environment.ProcessorCount + 1 : Environment.ProcessorCount;
                }
 
-               internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
+               internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
+                                                                  Action<TElement, CancellationToken> call,
                                                                   Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
                                                                   QueryOptions options)
                {
                        return Process<TSource, TElement> (node, call, acquisitionFunc, null, options);
                }
-               
-               internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement> call,
+
+               internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node,
+                                                                  Action<TElement, CancellationToken> call,
                                                                   Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
                                                                   Action endAction,
                                                                   QueryOptions options)
                {
-                       return Process<TSource, TElement> (node,
-                                                          (e, i) => call (e),
-                                                          acquisitionFunc,
-                                                          endAction == null ? ((Action<int>)null) : (i) => endAction (),
-                                                          options);
-               }
+                       CancellationTokenSource src
+                               = CancellationTokenSource.CreateLinkedTokenSource (options.ImplementerToken, options.Token);
 
-               internal static Task[] Process<TSource, TElement> (QueryBaseNode<TSource> node, Action<TElement, int> call,
-                                                                  Func<QueryBaseNode<TSource>, QueryOptions, IList<IEnumerable<TElement>>> acquisitionFunc,
-                                                                  Action<int> endAction,
-                                                                  QueryOptions options)
-               {
                        IList<IEnumerable<TElement>> enumerables = acquisitionFunc (node, options);
 
                        Task[] tasks = new Task[enumerables.Count];
-                       
+
                        for (int i = 0; i < tasks.Length; i++) {
                                int index = i;
                                tasks[i] = Task.Factory.StartNew (() => {
-                                       foreach (TElement item in enumerables[index]) {
-                                               // This is from specific operators
-                                               if (options.ImplementerToken.IsCancellationRequested)
-                                                       break;
-                                               if (options.Token.IsCancellationRequested)
-                                                       throw new OperationCanceledException (options.Token);
+                                       try {
+                                               foreach (TElement item in enumerables[index]) {
+                                                       if (!CheckTokens (options))
+                                                               break;
 
-                                               call (item, index);
+                                                       try {
+                                                               call (item, src.Token);
+                                                       } catch (OperationCanceledException canceledException) {
+                                                               if (canceledException.CancellationToken != src.Token)
+                                                                       throw canceledException;
+                                                       }
+
+                                                       if (!CheckTokens (options))
+                                                               break;
+                                               }
+                                       } finally {
+                                               if (endAction != null)
+                                                       endAction ();
                                        }
-                                       if (endAction != null)
-                                               endAction (index);
-                                 }, options.Token);
+                               }, options.Token, TaskCreationOptions.AttachedToParent | TaskCreationOptions.LongRunning, TaskScheduler.Default);
                        }
 
                        return tasks;
                }
 
-               internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T> call)
+               static bool CheckTokens (QueryOptions options)
+               {
+                       // This is from specific operators
+                       if (options.ImplementerToken.IsCancellationRequested)
+                               return false;
+                       if (options.Token.IsCancellationRequested)
+                               throw new OperationCanceledException (options.Token);
+                       return true;
+               }
+
+               internal static void ProcessAndBlock<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call)
                {
                        QueryOptions options = CheckQuery (node, true);
 
-                       Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
+                       Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
                        Task.WaitAll (tasks, options.Token);
                }
 
-               internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T> call,
+               internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<T, CancellationToken> call,
                                                              Action callback, QueryOptions options)
                {
-                       Task[] tasks = Process (node, call, (n, o) => n.GetEnumerables (o), options);
+                       Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetEnumerables, options);
                        if (callback != null)
                                Task.Factory.ContinueWhenAll (tasks,  (_) => callback ());
 
                        return () => Task.WaitAll (tasks, options.Token);
                }
 
-               internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, int> call,
-                                                             Action<int> endAction,
+               internal static Action ProcessAndCallback<T> (QueryBaseNode<T> node, Action<KeyValuePair<long, T>, CancellationToken> call,
+                                                             Action endAction,
                                                              Action callback, QueryOptions options)
                {
-                       Task[] tasks = Process (node, call, (n, o) => n.GetOrderedEnumerables (o), endAction, options);
+                       Task[] tasks = Process (node, call, new QueryBaseNodeHelper<T> ().GetOrderedEnumerables, endAction, options);
                        if (callback != null)
                                Task.Factory.ContinueWhenAll (tasks,  (_) => callback ());
 
@@ -172,28 +184,14 @@ namespace System.Linq.Parallel
                        }
 
                        for (int i = 0; i < tasks.Length; i++) {
-                               int index = i;
-                               bool firstRun = true;
+                               var procSlot = new AggregateProcessSlot<T, U> (options,
+                                                                              i,
+                                                                              enumerables[i].GetEnumerator (),
+                                                                              locals,
+                                                                              localCall,
+                                                                              seedFunc);
 
-                               tasks[i] = Task.Factory.StartNew (() => {
-                                       foreach (T item in enumerables[index]) {
-                                               // This is from specific operators
-                                               if (options.ImplementerToken.IsCancellationRequested)
-                                                       break;
-                                               if (options.Token.IsCancellationRequested)
-                                                       throw new OperationCanceledException (options.Token);
-
-                                               if (firstRun && seedFunc == null) {
-                                                       firstRun = false;
-                                                       // HACK: TODO: omgwtfitsuckssomuch
-                                                       locals[index] = (U)(object)item;
-                                                       continue;
-                                               }
-                                               
-                                               U acc = locals[index];
-                                               locals[index] = localCall (acc, item);
-                                       }
-                               }, options.Token);
+                               tasks[i] = Task.Factory.StartNew (procSlot.Process, options.Token);
                        }
 
                        Task.WaitAll (tasks, options.Token);
@@ -201,6 +199,72 @@ namespace System.Linq.Parallel
                        if (call != null)
                                call (locals);
                }
+
+               class AggregateProcessSlot<T, U>
+               {
+                       readonly QueryOptions options;
+                       readonly int index;
+                       readonly IEnumerator<T> enumerator;
+                       readonly U[] locals;
+                       readonly Func<U, T, U> localCall;
+                       readonly Func<U> seedFunc;
+
+                       public AggregateProcessSlot (QueryOptions options,
+                                                    int index,
+                                                    IEnumerator<T> enumerator,
+                                                    U[] locals,
+                                                    Func<U, T, U> localCall,
+                                                    Func<U> seedFunc)
+                       {
+                               this.options = options;
+                               this.index = index;
+                               this.enumerator = enumerator;
+                               this.locals = locals;
+                               this.localCall = localCall;
+                               this.seedFunc = seedFunc;
+                       }
+
+                       public void Process ()
+                       {
+                               var token = options.Token;
+                               var implementerToken = options.ImplementerToken;
+
+                               try {
+                                       // Avoid cache thrashing of locals array
+                                       var local = locals [index];
+
+                                       if (seedFunc == null) {
+                                               if (!enumerator.MoveNext ())
+                                                       return;
+                                               local = (U)(object)enumerator.Current;
+                                       }
+
+                                       while (enumerator.MoveNext ()) {
+                                               if (implementerToken.IsCancellationRequested)
+                                                       break;
+                                               token.ThrowIfCancellationRequested ();
+                                               local = localCall (local, enumerator.Current);
+                                       }
+
+                                       locals [index] = local;
+                               } finally {
+                                       enumerator.Dispose ();
+                               }
+                       }
+               }
+
+               class QueryBaseNodeHelper<T>
+               {
+                       internal IList<IEnumerable<T>> GetEnumerables (QueryBaseNode<T> source, QueryOptions options)
+                       {
+                               return source.GetEnumerables (options);
+                       }
+
+                       internal IList<IEnumerable<KeyValuePair<long,T>>> GetOrderedEnumerables (QueryBaseNode<T> source, QueryOptions options)
+                       {
+                               return source.GetOrderedEnumerables (options);
+                       }
+               }
        }
 }
 #endif
\ No newline at end of file