// // ParallelEnumerable.cs // // Author: // Jérémie "Garuma" Laval // // Copyright (c) 2010 Jérémie "Garuma" Laval // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #if NET_4_0 using System; using System.Threading; using System.Collections; using System.Collections.Generic; using System.Collections.Concurrent; using System.Linq.Parallel; using System.Linq.Parallel.QueryNodes; namespace System.Linq { public static class ParallelEnumerable { #region Range & Repeat public static ParallelQuery Range (int start, int count) { if (int.MaxValue - start < count - 1) throw new ArgumentOutOfRangeException ("count", "start + count - 1 is larger than Int32.MaxValue"); if (count < 0) throw new ArgumentOutOfRangeException ("count", "count is less than 0"); return (new RangeList (start, count)).AsParallel (); } public static ParallelQuery Repeat (TResult element, int count) { if (count < 0) throw new ArgumentOutOfRangeException ("count", "count is less than 0"); return (new RepeatList (element, count)).AsParallel (); } #endregion #region Empty public static ParallelQuery Empty () { return Repeat (default (TResult), 0); } #endregion #region AsParallel public static ParallelQuery AsParallel (this IEnumerable source) { if (source == null) throw new ArgumentNullException ("source"); /* Someone might be trying to use AsParallel a bit too much, if the query was in fact * already a ParallelQuery, just cast it */ ParallelQuery query = source as ParallelQuery; return query ?? new ParallelQuery (new QueryStartNode (source)); } public static ParallelQuery AsParallel (this Partitioner source) { if (source == null) throw new ArgumentNullException ("source"); /* Someone might be trying to use AsParallel a bit too much, if the query was in fact * already a ParallelQuery, just cast it */ ParallelQuery query = source as ParallelQuery; return query ?? new ParallelQuery (new QueryStartNode (source)); } public static ParallelQuery AsParallel (this IEnumerable source) { if (source == null) throw new ArgumentNullException ("source"); /* Someone might be trying to use AsParallel a bit too much, if the query was in fact * already a ParallelQuery, just cast it */ ParallelQuery query = source as ParallelQuery; return query ?? new ParallelQuery (new QueryStartNode (source.Cast ())); } public static IEnumerable AsEnumerable (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.AsSequential (); } public static IEnumerable AsSequential (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Node.GetSequential (); } #endregion #region AsOrdered / AsUnordered public static ParallelQuery AsOrdered (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new QueryAsOrderedNode (source.Node)); } public static ParallelQuery AsUnordered (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new QueryAsUnorderedNode (source.Node)); } public static ParallelQuery AsOrdered (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.TypedQuery.AsOrdered (); } #endregion #region With* public static ParallelQuery WithExecutionMode (this ParallelQuery source, ParallelExecutionMode executionMode) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new ParallelExecutionModeNode (executionMode, source.Node)); } public static ParallelQuery WithCancellation (this ParallelQuery source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new CancellationTokenNode (cancellationToken, source.Node)); } public static ParallelQuery WithMergeOptions (this ParallelQuery source, ParallelMergeOptions mergeOptions) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new ParallelMergeOptionsNode (mergeOptions, source.Node)); } public static ParallelQuery WithDegreeOfParallelism (this ParallelQuery source, int degreeOfParallelism) { if (degreeOfParallelism < 1 || degreeOfParallelism > 63) throw new ArgumentException ("degreeOfParallelism is less than 1 or greater than 63", "degreeOfParallelism"); if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new DegreeOfParallelismNode (degreeOfParallelism, source.Node)); } internal static ParallelQuery WithImplementerToken (this ParallelQuery source, CancellationTokenSource token) { return new ParallelQuery (new ImplementerTokenNode (token, source.Node)); } #endregion #region Select public static ParallelQuery Select (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return new ParallelQuery (new QuerySelectNode (source.Node, selector)); } public static ParallelQuery Select (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return new ParallelQuery (new QuerySelectNode (source.Node, selector)); } #endregion #region SelectMany public static ParallelQuery SelectMany (this ParallelQuery source, Func> selector) { return source.SelectMany (selector, (s, e) => e); } public static ParallelQuery SelectMany (this ParallelQuery source, Func> selector) { return source.SelectMany (selector, (s, e) => e); } public static ParallelQuery SelectMany (this ParallelQuery source, Func> collectionSelector, Func resultSelector) { return new ParallelQuery (new QuerySelectManyNode (source.Node, collectionSelector, resultSelector)); } public static ParallelQuery SelectMany (this ParallelQuery source, Func> collectionSelector, Func resultSelector) { return new ParallelQuery (new QuerySelectManyNode (source.Node, collectionSelector, resultSelector)); } #endregion #region Where public static ParallelQuery Where (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return new ParallelQuery (new QueryWhereNode (source.Node, predicate)); } public static ParallelQuery Where (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return new ParallelQuery (new QueryWhereNode (source.Node, predicate)); } #endregion #region Aggregate public static TSource Aggregate (this ParallelQuery source, Func func) { if (source == null) throw new ArgumentNullException ("source"); if (func == null) throw new ArgumentNullException ("func"); return source.Aggregate ((Func)null, func, func, (e) => e); } public static TAccumulate Aggregate (this ParallelQuery source, TAccumulate seed, Func func) { if (source == null) throw new ArgumentNullException ("source"); if (func == null) throw new ArgumentNullException ("func"); return source.Aggregate (seed, func, (e) => e); } public static TResult Aggregate (this ParallelQuery source, TAccumulate seed, Func func, Func resultSelector) { if (source == null) throw new ArgumentNullException ("source"); if (func == null) throw new ArgumentNullException ("func"); if (resultSelector == null) throw new ArgumentNullException ("resultSelector"); TAccumulate accumulator = seed; foreach (TSource value in source) accumulator = func (accumulator, value); return resultSelector (accumulator); } public static TResult Aggregate (this ParallelQuery source, TAccumulate seed, Func updateAccumulatorFunc, Func combineAccumulatorsFunc, Func resultSelector) { if (source == null) throw new ArgumentNullException ("source"); if (updateAccumulatorFunc == null) throw new ArgumentNullException ("updateAccumulatorFunc"); if (combineAccumulatorsFunc == null) throw new ArgumentNullException ("combineAccumulatorsFunc"); if (resultSelector == null) throw new ArgumentNullException ("resultSelector"); return source.Aggregate (() => seed, updateAccumulatorFunc, combineAccumulatorsFunc, resultSelector); } public static TResult Aggregate (this ParallelQuery source, Func seedFactory, Func updateAccumulatorFunc, Func combineAccumulatorsFunc, Func resultSelector) { if (source == null) throw new ArgumentNullException ("source"); if (seedFactory == null) throw new ArgumentNullException ("seedFactory"); if (updateAccumulatorFunc == null) throw new ArgumentNullException ("updateAccumulatorFunc"); if (combineAccumulatorsFunc == null) throw new ArgumentNullException ("combineAccumulatorsFunc"); if (resultSelector == null) throw new ArgumentNullException ("resultSelector"); TAccumulate accumulator = default (TAccumulate); ParallelExecuter.ProcessAndAggregate (source.Node, seedFactory, updateAccumulatorFunc, (list) => { accumulator = list [0]; for (int i = 1; i < list.Count; i++) accumulator = combineAccumulatorsFunc (accumulator, list[i]); }); return resultSelector (accumulator);; } #endregion #region ForAll public static void ForAll (this ParallelQuery source, Action action) { if (source == null) throw new ArgumentNullException ("source"); if (action == null) throw new ArgumentNullException ("action"); ParallelExecuter.ProcessAndBlock (source.Node, (e, c) => action (e)); } #endregion #region OrderBy public static OrderedParallelQuery OrderByDescending (this ParallelQuery source, Func keySelector, IComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = Comparer.Default; Comparison comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2)); return new OrderedParallelQuery (new QueryOrderByNode (source.Node, comparison)); } public static OrderedParallelQuery OrderByDescending (this ParallelQuery source, Func keySelector) { return OrderByDescending (source, keySelector, Comparer.Default); } public static OrderedParallelQuery OrderBy (this ParallelQuery source, Func keySelector) { return OrderBy (source, keySelector, Comparer.Default); } public static OrderedParallelQuery OrderBy (this ParallelQuery source, Func keySelector, IComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = Comparer.Default; Comparison comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2)); return new OrderedParallelQuery (new QueryOrderByNode (source.Node, comparison)); } #endregion #region ThenBy public static OrderedParallelQuery ThenBy (this OrderedParallelQuery source, Func keySelector) { return ThenBy (source, keySelector, Comparer.Default); } public static OrderedParallelQuery ThenBy (this OrderedParallelQuery source, Func keySelector, IComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = Comparer.Default; Comparison comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2)); return new OrderedParallelQuery (new QueryOrderByNode (source.Node, comparison)); } public static OrderedParallelQuery ThenByDescending (this OrderedParallelQuery source, Func keySelector) { return ThenByDescending (source, keySelector, Comparer.Default); } public static OrderedParallelQuery ThenByDescending (this OrderedParallelQuery source, Func keySelector, IComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = Comparer.Default; Comparison comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2)); return new OrderedParallelQuery (new QueryOrderByNode (source.Node, comparison)); } #endregion #region All public static bool All (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); CancellationTokenSource src = new CancellationTokenSource (); ParallelQuery innerQuery = source.WithImplementerToken (src); bool result = true; try { innerQuery.ForAll ((e) => { if (!predicate (e)) { result = false; src.Cancel (); } }); } catch (OperationCanceledException e) { if (e.CancellationToken != src.Token) throw e; } return result; } #endregion #region Any public static bool Any (this ParallelQuery source) { return Any (source, (_) => true); } public static bool Any (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return !source.All ((e) => !predicate (e)); } #endregion #region Contains public static bool Contains (this ParallelQuery source, TSource value) { return Contains (source, value, EqualityComparer.Default); } public static bool Contains (this ParallelQuery source, TSource value, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (comparer == null) comparer = EqualityComparer.Default; return Any (source, (e) => comparer.Equals (value, e)); } #endregion #region SequenceEqual public static bool SequenceEqual (this ParallelQuery first, ParallelQuery second) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); return first.SequenceEqual (second, EqualityComparer.Default); } public static bool SequenceEqual (this ParallelQuery first, ParallelQuery second, IEqualityComparer comparer) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); if (comparer == null) comparer = EqualityComparer.Default; var source = new CancellationTokenSource (); var zip = new QueryZipNode (comparer.Equals, first.Node, second.Node) { Strict = true }; var innerQuery = new ParallelQuery (zip).WithImplementerToken (source); bool result = true; try { innerQuery.ForAll (value => { if (!value) { result = false; source.Cancel (); } }); } catch (AggregateException ex) { if (ex.InnerException is QueryZipException) return false; else throw ex; } catch (OperationCanceledException e) { if (e.CancellationToken != source.Token) throw e; } return result; } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static bool SequenceEqual (this ParallelQuery first, IEnumerable second) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static bool SequenceEqual (this ParallelQuery first, IEnumerable second, IEqualityComparer comparer) { throw new NotSupportedException (); } #endregion #region GroupBy public static ParallelQuery> GroupBy (this ParallelQuery source, Func keySelector) { return source.GroupBy (keySelector, EqualityComparer.Default); } public static ParallelQuery> GroupBy (this ParallelQuery source, Func keySelector, IEqualityComparer comparer) { return source.GroupBy (keySelector, new Identity ().Apply, comparer); } public static ParallelQuery> GroupBy (this ParallelQuery source, Func keySelector, Func elementSelector) { return source.GroupBy (keySelector, elementSelector, EqualityComparer.Default); } public static ParallelQuery GroupBy (this ParallelQuery source, Func keySelector, Func, TResult> resultSelector) { return source.GroupBy (keySelector) .Select ((g) => resultSelector (g.Key, (IEnumerable)g)); } public static ParallelQuery> GroupBy (this ParallelQuery source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (elementSelector == null) throw new ArgumentNullException ("elementSelector"); if (comparer == null) comparer = EqualityComparer.Default; return new ParallelQuery> (new QueryGroupByNode (source.Node, keySelector, elementSelector, comparer)); } public static ParallelQuery GroupBy (this ParallelQuery source, Func keySelector, Func elementSelector, Func, TResult> resultSelector) { return source.GroupBy (keySelector, elementSelector) .Select ((g) => resultSelector (g.Key, (IEnumerable)g)); } public static ParallelQuery GroupBy (this ParallelQuery source, Func keySelector, Func, TResult> resultSelector, IEqualityComparer comparer) { return source.GroupBy (keySelector, comparer) .Select ((g) => resultSelector (g.Key, (IEnumerable)g)); } public static ParallelQuery GroupBy (this ParallelQuery source, Func keySelector, Func elementSelector, Func, TResult> resultSelector, IEqualityComparer comparer) { return source.GroupBy (keySelector, elementSelector, comparer) .Select ((g) => resultSelector (g.Key, (IEnumerable)g)); } #endregion #region GroupJoin public static ParallelQuery GroupJoin (this ParallelQuery outer, ParallelQuery inner, Func outerKeySelector, Func innerKeySelector, Func, TResult> resultSelector) { return outer.GroupJoin (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer.Default); } public static ParallelQuery GroupJoin (this ParallelQuery outer, ParallelQuery inner, Func outerKeySelector, Func innerKeySelector, Func, TResult> resultSelector, IEqualityComparer comparer) { return outer.Join (inner.GroupBy (innerKeySelector, (e) => e), outerKeySelector, (e) => e.Key, (e1, e2) => resultSelector (e1, e2), comparer); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery GroupJoin (this ParallelQuery outer, IEnumerable inner, Func outerKeySelector, Func innerKeySelector, Func, TResult> resultSelector) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery GroupJoin (this ParallelQuery outer, IEnumerable inner, Func outerKeySelector, Func innerKeySelector, Func, TResult> resultSelector, IEqualityComparer comparer) { throw new NotSupportedException (); } #endregion #region ElementAt public static TSource ElementAt (this ParallelQuery source, int index) { if (source == null) throw new ArgumentNullException ("source"); if (index < 0) throw new ArgumentOutOfRangeException ("index"); if (index == 0) { try { return source.First (); } catch (InvalidOperationException) { throw new ArgumentOutOfRangeException ("index"); } } TSource result = default (TSource); ParallelQuery innerQuery = source.Where ((e, i) => i == index); try { result = innerQuery.First (); } catch (InvalidOperationException) { throw new ArgumentOutOfRangeException ("index"); } return result; } public static TSource ElementAtOrDefault (this ParallelQuery source, int index) { if (source == null) throw new ArgumentNullException ("source"); try { return source.ElementAt (index); } catch (ArgumentOutOfRangeException) { return default (TSource); } } #endregion #region Intersect public static ParallelQuery Intersect (this ParallelQuery first, ParallelQuery second) { return Intersect (first, second, EqualityComparer.Default); } public static ParallelQuery Intersect (this ParallelQuery first, ParallelQuery second, IEqualityComparer comparer) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); if (comparer == null) comparer = EqualityComparer.Default; return new ParallelQuery (new QuerySetNode (SetInclusionDefaults.Intersect, comparer, first.Node, second.Node)); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Intersect (this ParallelQuery first, IEnumerable second) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Intersect (this ParallelQuery first, IEnumerable second, IEqualityComparer comparer) { throw new NotSupportedException (); } #endregion #region Join public static ParallelQuery Join (this ParallelQuery outer, ParallelQuery inner, Func outerKeySelector, Func innerKeySelector, Func resultSelector) { return outer.Join (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer.Default); } public static ParallelQuery Join (this ParallelQuery outer, ParallelQuery inner, Func outerKeySelector, Func innerKeySelector, Func resultSelector, IEqualityComparer comparer) { return new ParallelQuery (new QueryJoinNode (outer.Node, inner.Node, outerKeySelector, innerKeySelector, resultSelector, comparer)); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Join (this ParallelQuery outer, IEnumerable inner, Func outerKeySelector, Func innerKeySelector, Func resultSelector) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Join (this ParallelQuery outer, IEnumerable inner, Func outerKeySelector, Func innerKeySelector, Func resultSelector, IEqualityComparer comparer) { throw new NotSupportedException (); } #endregion #region Except public static ParallelQuery Except (this ParallelQuery first, ParallelQuery second) { return Except (first, second, EqualityComparer.Default); } public static ParallelQuery Except (this ParallelQuery first, ParallelQuery second, IEqualityComparer comparer) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); if (comparer == null) comparer = EqualityComparer.Default; return new ParallelQuery (new QuerySetNode (SetInclusionDefaults.Except, comparer, first.Node, second.Node)); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Except (this ParallelQuery first, IEnumerable second) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Except (this ParallelQuery first, IEnumerable second, IEqualityComparer comparer) { throw new NotSupportedException (); } #endregion #region Distinct public static ParallelQuery Distinct (this ParallelQuery source) { return Distinct (source, EqualityComparer.Default); } public static ParallelQuery Distinct (this ParallelQuery source, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (comparer == null) comparer = EqualityComparer.Default; return new ParallelQuery (new QuerySetNode (SetInclusionDefaults.Distinct, comparer, source.Node, null)); } #endregion #region Union [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Union (this ParallelQuery first, IEnumerable second) { throw new NotSupportedException (); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Union(this ParallelQuery first, IEnumerable second, IEqualityComparer comparer) { throw new NotSupportedException (); } public static ParallelQuery Union (this ParallelQuery first, ParallelQuery second) { return first.Union (second, EqualityComparer.Default); } public static ParallelQuery Union (this ParallelQuery first, ParallelQuery second, IEqualityComparer comparer) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); if (comparer == null) comparer = EqualityComparer.Default; return new ParallelQuery (new QuerySetNode (SetInclusionDefaults.Union, comparer, first.Node, second.Node)); } #endregion #region Take public static ParallelQuery Take (this ParallelQuery source, int count) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new QueryHeadWorkerNode (source.Node, count)); } public static ParallelQuery TakeWhile (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return new ParallelQuery (new QueryHeadWorkerNode (source.Node, (e, _) => predicate (e), false)); } public static ParallelQuery TakeWhile (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return new ParallelQuery (new QueryHeadWorkerNode (source.Node, predicate, true)); } #endregion #region Skip public static ParallelQuery Skip (this ParallelQuery source, int count) { if (source == null) throw new ArgumentNullException ("source"); return source.Node.IsOrdered () ? source.Where ((e, i) => i >= count) : source.Where ((e) => count < 0 || Interlocked.Decrement (ref count) < 0); } public static ParallelQuery SkipWhile (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return source.Node.IsOrdered () ? source.SkipWhile ((e, i) => predicate (e)) : source.Where ((e) => !predicate (e)); } public static ParallelQuery SkipWhile (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); int indexCache = int.MaxValue; return source.Where ((e, i) => i >= indexCache || (!predicate (e, i) && (indexCache = i) == i)); } #endregion #region Single static TSource SingleInternal (this ParallelQuery source, params TSource[] init) { TSource result = default(TSource); bool hasValue = false; foreach (TSource element in source) { if (hasValue) throw new InvalidOperationException ("The input sequence contains more than one element."); result = element; hasValue = true; } if (!hasValue && init.Length != 0) { result = init[0]; hasValue = true; } if (!hasValue) throw new InvalidOperationException ("The input sequence is empty."); return result; } public static TSource Single (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return SingleInternal (source); } public static TSource Single (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return source.Where (predicate).Single (); } public static TSource SingleOrDefault (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return SingleInternal (source, default (TSource)); } public static TSource SingleOrDefault (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return source.Where (predicate).SingleOrDefault (); } #endregion #region Count public static int Count (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); var helper = new CountAggregateHelper (); return source.Aggregate (helper.Seed, helper.Intermediate, helper.Reducer, helper.Final); } public static int Count (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return source.Where (predicate).Count (); } class CountAggregateHelper { public int Seed () { return 0; } public int Intermediate (int acc, TSource e) { return acc + 1; } public int Reducer (int acc1, int acc2) { return acc1 + acc2; } public int Final (int acc) { return acc; } } public static long LongCount (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); var helper = new LongCountAggregateHelper (); return source.Aggregate (helper.Seed, helper.Intermediate, helper.Reducer, helper.Final); } public static long LongCount (this ParallelQuery source, Func predicate) { if (source == null) throw new ArgumentNullException ("source"); if (predicate == null) throw new ArgumentNullException ("predicate"); return source.Where (predicate).LongCount (); } class LongCountAggregateHelper { public long Seed () { return 0; } public long Intermediate (long acc, TSource e) { return acc + 1; } public long Reducer (long acc1, long acc2) { return acc1 + acc2; } public long Final (long acc) { return acc; } } #endregion #region Average public static double Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (() => new int[2], (acc, e) => { acc[0] += e; acc[1]++; return acc; }, (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; }, (acc) => acc[0] / ((double)acc[1])); } public static double Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (() => new long[2], (acc, e) => { acc[0] += e; acc[1]++; return acc; }, (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; }, (acc) => acc[0] / ((double)acc[1])); } public static decimal Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (() => new decimal[2], (acc, e) => { acc[0] += e; acc[1]++; return acc; }, (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; }, (acc) => acc[0] / acc[1]); } public static double Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (() => new double[2], (acc, e) => { acc[0] += e; acc[1]++; return acc; }, (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; }, (acc) => acc[0] / ((double)acc[1])); } public static float Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (() => new float[2], (acc, e) => { acc[0] += e; acc[1]++; return acc; }, (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; }, (acc) => acc[0] / acc[1]); } #endregion #region More Average public static double? Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();; } public static double? Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Average (); } public static decimal? Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Average (); } public static double? Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Average (); } public static float? Average (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Average (); } public static double Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static double Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static float Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static double Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static decimal Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static double? Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static double? Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static float? Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static double? Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } public static decimal? Average (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Average (); } #endregion #region Sum public static int Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum); } public static long Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate ((long)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum); } public static float Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (0.0f, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum); } public static double Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate (0.0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum); } public static decimal Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Aggregate ((decimal)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum); } public static int? Sum (this ParallelQuery source) { return source.Select ((e) => e.HasValue ? e.Value : 0).Sum (); } public static long? Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Sum (); } public static float? Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Sum (); } public static double? Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Sum (); } public static decimal? Sum (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : 0).Sum (); } public static int Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static long Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static decimal Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static float Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static double Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static int? Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static long? Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static decimal? Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static float? Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } public static double? Sum (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Sum (); } #endregion #region Min-Max static T BestOrder (ParallelQuery source, BestOrderComparer bestOrderComparer) { if (source == null) throw new ArgumentNullException ("source"); T best = source.Aggregate (bestOrderComparer.Seed, bestOrderComparer.Intermediate, bestOrderComparer.Intermediate, new Identity ().Apply); return best; } class BestOrderComparer { IComparer comparer; int inverter; T seed; public BestOrderComparer (IComparer comparer, int inverter, T seed) { this.comparer = comparer; this.inverter = inverter; this.seed = seed; } public T Seed () { return seed; } public T Intermediate (T first, T second) { return Better (first, second) ? first : second; } bool Better (T first, T second) { return (inverter * comparer.Compare (first, second)) > 0; } } public static int Min (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, -1, int.MaxValue)); } public static long Min (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, -1, long.MaxValue)); } public static float Min (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, -1, float.MaxValue)); } public static double Min (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, -1, double.MaxValue)); } public static decimal Min (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, -1, decimal.MaxValue)); } public static TSource Min (this ParallelQuery source) { IComparer comparer = Comparer.Default; return BestOrder (source, new BestOrderComparer (comparer, -1, default (TSource))); } public static TResult Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static int? Min (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : int.MaxValue).Min (); } public static long? Min (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : long.MaxValue).Min (); } public static float? Min (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : float.MaxValue).Min (); } public static double? Min (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : double.MaxValue).Min (); } public static decimal? Min (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : decimal.MaxValue).Min (); } public static int Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static long Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static float Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static double Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static decimal Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static int? Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static long? Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static float? Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static double? Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static decimal? Min (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Min (); } public static int Max (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, 1, int.MinValue)); } public static long Max (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, 1, long.MinValue)); } public static float Max (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, 1, float.MinValue)); } public static double Max (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, 1, double.MinValue)); } public static decimal Max (this ParallelQuery source) { return BestOrder (source, new BestOrderComparer (Comparer.Default, 1, decimal.MinValue)); } public static TSource Max (this ParallelQuery source) { IComparer comparer = Comparer.Default; return BestOrder (source, new BestOrderComparer (comparer, 1, default (TSource))); } public static TResult Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static int? Max (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : int.MinValue).Max (); } public static long? Max (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : long.MinValue).Max (); } public static float? Max (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : float.MinValue).Max (); } public static double? Max (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : double.MinValue).Max (); } public static decimal? Max (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.Select ((e) => e.HasValue ? e.Value : decimal.MinValue).Max (); } public static int Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static long Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static float Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static double Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static decimal Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static int? Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static long? Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static float? Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static double? Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } public static decimal? Max (this ParallelQuery source, Func selector) { if (source == null) throw new ArgumentNullException ("source"); if (selector == null) throw new ArgumentNullException ("selector"); return source.Select (selector).Max (); } #endregion #region Cast / OfType public static ParallelQuery Cast (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.TypedQuery.Select ((e) => (TResult)e); } public static ParallelQuery OfType (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return source.TypedQuery.Where ((e) => e is TResult).Cast (); } #endregion #region Reverse public static ParallelQuery Reverse (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); return new ParallelQuery (new QueryReverseNode (source)); } #endregion #region ToArray - ToList - ToDictionary - ToLookup public static List ToList (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); if (source.Node.IsOrdered ()) return ToListOrdered (source); var helper = new ListAggregateHelper (); List temp = source.Aggregate (helper.Seed, helper.Intermediate, helper.Reducer, helper.Final); return temp; } class ListAggregateHelper { public List Seed () { return new List (50); } public List Intermediate (List list, TSource e) { list.Add (e); return list; } public List Reducer (List list, List list2) { list.AddRange (list2); return list; } public List Final (List list) { return list; } } internal static List ToListOrdered (this ParallelQuery source) { List result = new List (); foreach (TSource element in source) result.Add (element); return result; } public static TSource[] ToArray (this ParallelQuery source) { if (source == null) throw new ArgumentNullException ("source"); if (source.Node.IsOrdered ()) return ToListOrdered (source).ToArray (); var helper = new ArrayAggregateHelper (); ParallelExecuter.ProcessAndAggregate> (source.Node, helper.Seed, helper.Intermediate, helper.Final); return helper.Result; } class ArrayAggregateHelper { TSource[] result; public TSource[] Result { get { return result; } } internal List Seed () { return new List (); } internal List Intermediate (List list, TSource e) { list.Add (e); return list; } internal void Final (IList> list) { int count = 0; for (int i = 0; i < list.Count; i++) count += list[i].Count; result = new TSource[count]; int insertIndex = -1; for (int i = 0; i < list.Count; i++) for (int j = 0; j < list[i].Count; j++) result [++insertIndex] = list[i][j]; } } public static Dictionary ToDictionary (this ParallelQuery source, Func keySelector, IEqualityComparer comparer) { return ToDictionary (source, keySelector, (e) => e, comparer); } public static Dictionary ToDictionary (this ParallelQuery source, Func keySelector) { return ToDictionary (source, keySelector, (e) => e, EqualityComparer.Default); } public static Dictionary ToDictionary (this ParallelQuery source, Func keySelector, Func elementSelector) { return ToDictionary (source, keySelector, elementSelector, EqualityComparer.Default); } public static Dictionary ToDictionary (this ParallelQuery source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = EqualityComparer.Default; if (elementSelector == null) throw new ArgumentNullException ("elementSelector"); var helper = new DictionaryAggregateHelper (comparer, keySelector, elementSelector); return source.Aggregate (helper.Seed, helper.Intermediate, helper.Reducer, helper.Final); } class DictionaryAggregateHelper { IEqualityComparer comparer; Func keySelector; Func elementSelector; public DictionaryAggregateHelper (IEqualityComparer comparer, Func keySelector, Func elementSelector) { this.comparer = comparer; this.keySelector = keySelector; this.elementSelector = elementSelector; } public Dictionary Seed () { return new Dictionary (comparer); } public Dictionary Intermediate (Dictionary d, TSource e) { d.Add (keySelector (e), elementSelector (e)); return d; } public Dictionary Reducer (Dictionary d1, Dictionary d2) { foreach (var couple in d2) d1.Add (couple.Key, couple.Value); return d1; } public Dictionary Final (Dictionary d) { return d; } } public static ILookup ToLookup (this ParallelQuery source, Func keySelector) { return ToLookup (source, keySelector, (e) => e, EqualityComparer.Default); } public static ILookup ToLookup (this ParallelQuery source, Func keySelector, IEqualityComparer comparer) { return ToLookup (source, keySelector, (e) => e, comparer); } public static ILookup ToLookup (this ParallelQuery source, Func keySelector, Func elementSelector) { return ToLookup (source, keySelector, elementSelector, EqualityComparer.Default); } public static ILookup ToLookup (this ParallelQuery source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException ("source"); if (keySelector == null) throw new ArgumentNullException ("keySelector"); if (comparer == null) comparer = EqualityComparer.Default; if (elementSelector == null) throw new ArgumentNullException ("elementSelector"); ConcurrentLookup lookup = new ConcurrentLookup (comparer); source.ForAll ((e) => lookup.Add (keySelector (e), elementSelector (e))); return lookup; } #endregion #region Concat [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather than " + "System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() extension method " + "to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Concat(this ParallelQuery first, IEnumerable second) { throw new NotSupportedException (); } public static ParallelQuery Concat (this ParallelQuery first, ParallelQuery second) { return new ParallelQuery (new QueryConcatNode (first.Node, second.Node)); } #endregion #region DefaultIfEmpty public static ParallelQuery DefaultIfEmpty (this ParallelQuery source) { return source.DefaultIfEmpty (default (TSource)); } public static ParallelQuery DefaultIfEmpty (this ParallelQuery source, TSource defaultValue) { return new ParallelQuery (new QueryDefaultEmptyNode (source.Node, defaultValue)); } #endregion #region First public static TSource First (this ParallelQuery source) { CancellationTokenSource src = new CancellationTokenSource (); IEnumerator enumerator = source.WithImplementerToken (src).GetEnumerator (); if (enumerator == null || !enumerator.MoveNext ()) throw new InvalidOperationException ("source contains no element"); TSource result = enumerator.Current; src.Cancel (); enumerator.Dispose (); return result; } public static TSource First (this ParallelQuery source, Func predicate) { return source.Where (predicate).First (); } public static TSource FirstOrDefault (this ParallelQuery source) { return source.DefaultIfEmpty ().First (); } public static TSource FirstOrDefault (this ParallelQuery source, Func predicate) { return source.Where (predicate).FirstOrDefault (); } #endregion #region Last public static TSource Last (this ParallelQuery source) { return source.Reverse ().First (); } public static TSource Last (this ParallelQuery source, Func predicate) { return source.Reverse ().First (predicate); } public static TSource LastOrDefault (this ParallelQuery source) { return source.Reverse ().FirstOrDefault (); } public static TSource LastOrDefault (this ParallelQuery source, Func predicate) { return source.Reverse ().FirstOrDefault (predicate); } #endregion #region Zip public static ParallelQuery Zip (this ParallelQuery first, ParallelQuery second, Func resultSelector) { if (first == null) throw new ArgumentNullException ("first"); if (second == null) throw new ArgumentNullException ("second"); if (resultSelector == null) throw new ArgumentNullException ("resultSelector"); return new ParallelQuery (new QueryZipNode (resultSelector, first.Node, second.Node)); } [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery rather " + "than System.Collections.Generic.IEnumerable. To fix this problem, use the AsParallel() " + "extension method to convert the right data source to System.Linq.ParallelQuery.")] public static ParallelQuery Zip (this ParallelQuery first, IEnumerable second, Func resultSelector) { throw new NotSupportedException (); } #endregion #region Helpers class Identity { public T Apply (T input) { return input; } } #endregion } } #endif