2 // ParallelEnumerable.cs
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33 using System.Linq.Parallel;
34 using System.Linq.Parallel.QueryNodes;
38 public static class ParallelEnumerable
40 #region Range & Repeat
41 public static ParallelQuery<int> Range (int start, int count)
43 if (int.MaxValue - start < count - 1)
44 throw new ArgumentOutOfRangeException ("count", "start + count - 1 is larger than Int32.MaxValue");
46 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
48 return (new RangeList (start, count)).AsParallel ();
51 public static ParallelQuery<TResult> Repeat<TResult> (TResult element, int count)
54 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
56 return (new RepeatList<TResult> (element, count)).AsParallel ();
61 public static ParallelQuery<TResult> Empty<TResult> ()
63 return Repeat<TResult> (default (TResult), 0);
68 public static ParallelQuery<TSource> AsParallel<TSource> (this IEnumerable<TSource> source)
71 throw new ArgumentNullException ("source");
73 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
74 * already a ParallelQuery, just cast it
76 ParallelQuery<TSource> query = source as ParallelQuery<TSource>;
78 return query ?? new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
81 public static ParallelQuery<TSource> AsParallel<TSource> (this Partitioner<TSource> source)
84 throw new ArgumentNullException ("source");
86 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
87 * already a ParallelQuery, just cast it
89 ParallelQuery<TSource> query = source as ParallelQuery<TSource>;
91 return query ?? new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
94 public static ParallelQuery AsParallel (this IEnumerable source)
97 throw new ArgumentNullException ("source");
99 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
100 * already a ParallelQuery, just cast it
102 ParallelQuery query = source as ParallelQuery;
104 return query ?? new ParallelQuery<object> (new QueryStartNode<object> (source.Cast<object> ()));
107 public static IEnumerable<TSource> AsEnumerable<TSource> (this ParallelQuery<TSource> source)
110 throw new ArgumentNullException ("source");
112 return source.AsSequential ();
115 public static IEnumerable<TSource> AsSequential<TSource> (this ParallelQuery<TSource> source)
118 throw new ArgumentNullException ("source");
120 return source.Node.GetSequential ();
124 #region AsOrdered / AsUnordered
125 public static ParallelQuery<TSource> AsOrdered<TSource> (this ParallelQuery<TSource> source)
128 throw new ArgumentNullException ("source");
130 return new ParallelQuery<TSource> (new QueryAsOrderedNode<TSource> (source.Node));
133 public static ParallelQuery<TSource> AsUnordered<TSource> (this ParallelQuery<TSource> source)
136 throw new ArgumentNullException ("source");
138 return new ParallelQuery<TSource> (new QueryAsUnorderedNode<TSource> (source.Node));
141 public static ParallelQuery AsOrdered (this ParallelQuery source)
144 throw new ArgumentNullException ("source");
146 return source.TypedQuery.AsOrdered ();
151 public static ParallelQuery<TSource> WithExecutionMode<TSource> (this ParallelQuery<TSource> source,
152 ParallelExecutionMode executionMode)
155 throw new ArgumentNullException ("source");
157 return new ParallelQuery<TSource> (new ParallelExecutionModeNode<TSource> (executionMode, source.Node));
160 public static ParallelQuery<TSource> WithCancellation<TSource> (this ParallelQuery<TSource> source,
161 CancellationToken cancellationToken)
164 throw new ArgumentNullException ("source");
166 return new ParallelQuery<TSource> (new CancellationTokenNode<TSource> (cancellationToken, source.Node));
169 public static ParallelQuery<TSource> WithMergeOptions<TSource> (this ParallelQuery<TSource> source,
170 ParallelMergeOptions mergeOptions)
173 throw new ArgumentNullException ("source");
175 return new ParallelQuery<TSource> (new ParallelMergeOptionsNode<TSource> (mergeOptions, source.Node));
178 public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource> (this ParallelQuery<TSource> source,
179 int degreeOfParallelism)
181 if (degreeOfParallelism < 1 || degreeOfParallelism > 63)
182 throw new ArgumentException ("degreeOfParallelism is less than 1 or greater than 63", "degreeOfParallelism");
184 throw new ArgumentNullException ("source");
186 return new ParallelQuery<TSource> (new DegreeOfParallelismNode<TSource> (degreeOfParallelism, source.Node));
189 internal static ParallelQuery<TSource> WithImplementerToken<TSource> (this ParallelQuery<TSource> source,
190 CancellationTokenSource token)
192 return new ParallelQuery<TSource> (new ImplementerTokenNode<TSource> (token, source.Node));
197 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
200 throw new ArgumentNullException ("source");
201 if (selector == null)
202 throw new ArgumentNullException ("selector");
204 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
207 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, int, TResult> selector)
210 throw new ArgumentNullException ("source");
211 if (selector == null)
212 throw new ArgumentNullException ("selector");
214 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
219 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
220 Func<TSource, IEnumerable<TResult>> selector)
222 return source.SelectMany (selector, (s, e) => e);
225 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
226 Func<TSource, int, IEnumerable<TResult>> selector)
228 return source.SelectMany (selector, (s, e) => e);
231 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
232 Func<TSource, IEnumerable<TCollection>> collectionSelector,
233 Func<TSource, TCollection, TResult> resultSelector)
235 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
240 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
241 Func<TSource, int, IEnumerable<TCollection>> collectionSelector,
242 Func<TSource, TCollection, TResult> resultSelector)
244 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
251 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
254 throw new ArgumentNullException ("source");
255 if (predicate == null)
256 throw new ArgumentNullException ("predicate");
258 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
261 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, int, bool> predicate)
264 throw new ArgumentNullException ("source");
265 if (predicate == null)
266 throw new ArgumentNullException ("predicate");
268 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
273 public static TSource Aggregate<TSource> (this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)
276 throw new ArgumentNullException ("source");
278 throw new ArgumentNullException ("func");
280 return source.Aggregate<TSource, TSource, TSource> ((Func<TSource>)null,
286 public static TAccumulate Aggregate<TSource, TAccumulate> (this ParallelQuery<TSource> source,
288 Func<TAccumulate, TSource, TAccumulate> func)
291 throw new ArgumentNullException ("source");
293 throw new ArgumentNullException ("func");
295 return source.Aggregate (seed, func, (e) => e);
298 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
300 Func<TAccumulate, TSource, TAccumulate> func,
301 Func<TAccumulate, TResult> resultSelector)
304 throw new ArgumentNullException ("source");
306 throw new ArgumentNullException ("func");
307 if (resultSelector == null)
308 throw new ArgumentNullException ("resultSelector");
310 TAccumulate accumulator = seed;
312 foreach (TSource value in source)
313 accumulator = func (accumulator, value);
315 return resultSelector (accumulator);
318 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
320 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
321 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
322 Func<TAccumulate, TResult> resultSelector)
325 throw new ArgumentNullException ("source");
326 if (updateAccumulatorFunc == null)
327 throw new ArgumentNullException ("updateAccumulatorFunc");
328 if (combineAccumulatorsFunc == null)
329 throw new ArgumentNullException ("combineAccumulatorsFunc");
330 if (resultSelector == null)
331 throw new ArgumentNullException ("resultSelector");
333 return source.Aggregate (() => seed, updateAccumulatorFunc, combineAccumulatorsFunc, resultSelector);
336 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
337 Func<TAccumulate> seedFactory,
338 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
339 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
340 Func<TAccumulate, TResult> resultSelector)
343 throw new ArgumentNullException ("source");
344 if (seedFactory == null)
345 throw new ArgumentNullException ("seedFactory");
346 if (updateAccumulatorFunc == null)
347 throw new ArgumentNullException ("updateAccumulatorFunc");
348 if (combineAccumulatorsFunc == null)
349 throw new ArgumentNullException ("combineAccumulatorsFunc");
350 if (resultSelector == null)
351 throw new ArgumentNullException ("resultSelector");
353 TAccumulate accumulator = default (TAccumulate);
355 ParallelExecuter.ProcessAndAggregate<TSource, TAccumulate> (source.Node, seedFactory, updateAccumulatorFunc, (list) => {
356 accumulator = list [0];
357 for (int i = 1; i < list.Count; i++)
358 accumulator = combineAccumulatorsFunc (accumulator, list[i]);
361 return resultSelector (accumulator);;
366 public static void ForAll<TSource> (this ParallelQuery<TSource> source, Action<TSource> action)
369 throw new ArgumentNullException ("source");
371 throw new ArgumentNullException ("action");
373 ParallelExecuter.ProcessAndBlock (source.Node, (e, c) => action (e));
378 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
379 Func<TSource, TKey> keySelector,
380 IComparer<TKey> comparer)
383 throw new ArgumentNullException ("source");
384 if (keySelector == null)
385 throw new ArgumentNullException ("keySelector");
386 if (comparer == null)
387 comparer = Comparer<TKey>.Default;
389 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
391 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
394 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
395 Func<TSource, TKey> keySelector)
397 return OrderByDescending (source, keySelector, Comparer<TKey>.Default);
400 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
401 Func<TSource, TKey> keySelector)
403 return OrderBy (source, keySelector, Comparer<TKey>.Default);
406 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
407 Func<TSource, TKey> keySelector,
408 IComparer<TKey> comparer)
411 throw new ArgumentNullException ("source");
412 if (keySelector == null)
413 throw new ArgumentNullException ("keySelector");
414 if (comparer == null)
415 comparer = Comparer<TKey>.Default;
417 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
419 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
424 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
425 Func<TSource, TKey> keySelector)
427 return ThenBy (source, keySelector, Comparer<TKey>.Default);
430 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
431 Func<TSource, TKey> keySelector,
432 IComparer<TKey> comparer)
435 throw new ArgumentNullException ("source");
436 if (keySelector == null)
437 throw new ArgumentNullException ("keySelector");
438 if (comparer == null)
439 comparer = Comparer<TKey>.Default;
441 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
443 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
446 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
447 Func<TSource, TKey> keySelector)
449 return ThenByDescending (source, keySelector, Comparer<TKey>.Default);
452 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
453 Func<TSource, TKey> keySelector,
454 IComparer<TKey> comparer)
457 throw new ArgumentNullException ("source");
458 if (keySelector == null)
459 throw new ArgumentNullException ("keySelector");
460 if (comparer == null)
461 comparer = Comparer<TKey>.Default;
463 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
465 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
470 public static bool All<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
473 throw new ArgumentNullException ("source");
474 if (predicate == null)
475 throw new ArgumentNullException ("predicate");
477 CancellationTokenSource src = new CancellationTokenSource ();
478 ParallelQuery<TSource> innerQuery = source.WithImplementerToken (src);
482 innerQuery.ForAll ((e) => {
483 if (!predicate (e)) {
488 } catch (OperationCanceledException e) {
489 if (e.CancellationToken != src.Token)
498 public static bool Any<TSource> (this ParallelQuery<TSource> source)
500 return Any<TSource> (source, (_) => true);
503 public static bool Any<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
506 throw new ArgumentNullException ("source");
507 if (predicate == null)
508 throw new ArgumentNullException ("predicate");
510 return !source.All ((e) => !predicate (e));
515 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value)
517 return Contains<TSource> (source, value, EqualityComparer<TSource>.Default);
520 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
523 throw new ArgumentNullException ("source");
524 if (comparer == null)
525 comparer = EqualityComparer<TSource>.Default;
527 return Any<TSource> (source, (e) => comparer.Equals (value, e));
531 #region SequenceEqual
532 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
533 ParallelQuery<TSource> second)
536 throw new ArgumentNullException ("first");
538 throw new ArgumentNullException ("second");
540 return first.SequenceEqual (second, EqualityComparer<TSource>.Default);
543 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
544 ParallelQuery<TSource> second,
545 IEqualityComparer<TSource> comparer)
548 throw new ArgumentNullException ("first");
550 throw new ArgumentNullException ("second");
551 if (comparer == null)
552 comparer = EqualityComparer<TSource>.Default;
554 var source = new CancellationTokenSource ();
555 var zip = new QueryZipNode<TSource, TSource, bool> (comparer.Equals, first.Node, second.Node) { Strict = true };
556 var innerQuery = new ParallelQuery<bool> (zip).WithImplementerToken (source);
561 innerQuery.ForAll (value => {
567 } catch (AggregateException ex) {
568 if (ex.InnerException is QueryZipException)
572 } catch (OperationCanceledException e) {
573 if (e.CancellationToken != source.Token)
580 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
581 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
582 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
583 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
585 throw new NotSupportedException ();
588 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
589 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
590 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
591 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
592 IEnumerable<TSource> second,
593 IEqualityComparer<TSource> comparer)
595 throw new NotSupportedException ();
601 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
602 Func<TSource, TKey> keySelector)
604 return source.GroupBy (keySelector, EqualityComparer<TKey>.Default);
607 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
608 Func<TSource, TKey> keySelector,
609 IEqualityComparer<TKey> comparer)
611 return source.GroupBy (keySelector, new Identity<TSource> ().Apply, comparer);
614 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
615 Func<TSource, TKey> keySelector,
616 Func<TSource, TElement> elementSelector)
618 return source.GroupBy (keySelector, elementSelector, EqualityComparer<TKey>.Default);
621 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
622 Func<TSource, TKey> keySelector,
623 Func<TKey, IEnumerable<TSource>, TResult> resultSelector)
625 return source.GroupBy (keySelector)
626 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
629 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
630 Func<TSource, TKey> keySelector,
631 Func<TSource, TElement> elementSelector,
632 IEqualityComparer<TKey> comparer)
635 throw new ArgumentNullException ("source");
636 if (keySelector == null)
637 throw new ArgumentNullException ("keySelector");
638 if (elementSelector == null)
639 throw new ArgumentNullException ("elementSelector");
640 if (comparer == null)
641 comparer = EqualityComparer<TKey>.Default;
643 return new ParallelQuery<IGrouping<TKey, TElement>> (new QueryGroupByNode<TSource, TKey, TElement> (source.Node, keySelector, elementSelector, comparer));
646 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
647 Func<TSource, TKey> keySelector,
648 Func<TSource, TElement> elementSelector,
649 Func<TKey, IEnumerable<TElement>, TResult> resultSelector)
651 return source.GroupBy (keySelector, elementSelector)
652 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
655 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
656 Func<TSource, TKey> keySelector,
657 Func<TKey, IEnumerable<TSource>, TResult> resultSelector,
658 IEqualityComparer<TKey> comparer)
660 return source.GroupBy (keySelector, comparer)
661 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
664 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
665 Func<TSource, TKey> keySelector,
666 Func<TSource, TElement> elementSelector,
667 Func<TKey, IEnumerable<TElement>, TResult> resultSelector,
668 IEqualityComparer<TKey> comparer)
670 return source.GroupBy (keySelector, elementSelector, comparer)
671 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
676 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
677 ParallelQuery<TInner> inner,
678 Func<TOuter, TKey> outerKeySelector,
679 Func<TInner, TKey> innerKeySelector,
680 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
682 return outer.GroupJoin (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
685 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
686 ParallelQuery<TInner> inner,
687 Func<TOuter, TKey> outerKeySelector,
688 Func<TInner, TKey> innerKeySelector,
689 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
690 IEqualityComparer<TKey> comparer)
692 return outer.Join (inner.GroupBy (innerKeySelector, (e) => e), outerKeySelector, (e) => e.Key, (e1, e2) => resultSelector (e1, e2), comparer);
695 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
696 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
697 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
698 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
699 IEnumerable<TInner> inner,
700 Func<TOuter, TKey> outerKeySelector,
701 Func<TInner, TKey> innerKeySelector,
702 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
704 throw new NotSupportedException ();
707 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
708 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
709 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
710 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
711 IEnumerable<TInner> inner,
712 Func<TOuter, TKey> outerKeySelector,
713 Func<TInner, TKey> innerKeySelector,
714 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
715 IEqualityComparer<TKey> comparer)
717 throw new NotSupportedException ();
722 public static TSource ElementAt<TSource> (this ParallelQuery<TSource> source, int index)
725 throw new ArgumentNullException ("source");
727 throw new ArgumentOutOfRangeException ("index");
730 return source.First ();
731 } catch (InvalidOperationException) {
732 throw new ArgumentOutOfRangeException ("index");
736 TSource result = default (TSource);
738 ParallelQuery<TSource> innerQuery = source.Where ((e, i) => i == index);
741 result = innerQuery.First ();
742 } catch (InvalidOperationException) {
743 throw new ArgumentOutOfRangeException ("index");
749 public static TSource ElementAtOrDefault<TSource> (this ParallelQuery<TSource> source, int index)
752 throw new ArgumentNullException ("source");
755 return source.ElementAt (index);
756 } catch (ArgumentOutOfRangeException) {
757 return default (TSource);
763 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
764 ParallelQuery<TSource> second)
766 return Intersect<TSource> (first, second, EqualityComparer<TSource>.Default);
769 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
770 ParallelQuery<TSource> second,
771 IEqualityComparer<TSource> comparer)
774 throw new ArgumentNullException ("first");
776 throw new ArgumentNullException ("second");
777 if (comparer == null)
778 comparer = EqualityComparer<TSource>.Default;
780 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Intersect, comparer, first.Node, second.Node));
783 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
784 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
785 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
786 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
788 throw new NotSupportedException ();
791 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
792 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
793 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
794 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
795 IEnumerable<TSource> second,
796 IEqualityComparer<TSource> comparer)
798 throw new NotSupportedException ();
803 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
804 ParallelQuery<TInner> inner,
805 Func<TOuter, TKey> outerKeySelector,
806 Func<TInner, TKey> innerKeySelector,
807 Func<TOuter, TInner, TResult> resultSelector)
809 return outer.Join (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
812 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
813 ParallelQuery<TInner> inner,
814 Func<TOuter, TKey> outerKeySelector,
815 Func<TInner, TKey> innerKeySelector,
816 Func<TOuter, TInner, TResult> resultSelector,
817 IEqualityComparer<TKey> comparer)
819 return new ParallelQuery<TResult> (new QueryJoinNode<TOuter, TInner, TKey, TResult> (outer.Node, inner.Node, outerKeySelector, innerKeySelector, resultSelector, comparer));
822 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
823 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
824 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
825 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
826 IEnumerable<TInner> inner,
827 Func<TOuter, TKey> outerKeySelector,
828 Func<TInner, TKey> innerKeySelector,
829 Func<TOuter, TInner, TResult> resultSelector)
831 throw new NotSupportedException ();
834 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
835 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
836 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
837 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
838 IEnumerable<TInner> inner,
839 Func<TOuter, TKey> outerKeySelector,
840 Func<TInner, TKey> innerKeySelector,
841 Func<TOuter, TInner, TResult> resultSelector,
842 IEqualityComparer<TKey> comparer)
844 throw new NotSupportedException ();
849 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
850 ParallelQuery<TSource> second)
852 return Except<TSource> (first, second, EqualityComparer<TSource>.Default);
855 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
856 ParallelQuery<TSource> second,
857 IEqualityComparer<TSource> comparer)
860 throw new ArgumentNullException ("first");
862 throw new ArgumentNullException ("second");
863 if (comparer == null)
864 comparer = EqualityComparer<TSource>.Default;
866 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Except,
867 comparer, first.Node, second.Node));
870 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
871 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
872 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
873 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
874 IEnumerable<TSource> second)
876 throw new NotSupportedException ();
879 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
880 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
881 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
882 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
883 IEnumerable<TSource> second,
884 IEqualityComparer<TSource> comparer)
886 throw new NotSupportedException ();
891 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source)
893 return Distinct<TSource> (source, EqualityComparer<TSource>.Default);
896 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source, IEqualityComparer<TSource> comparer)
899 throw new ArgumentNullException ("source");
900 if (comparer == null)
901 comparer = EqualityComparer<TSource>.Default;
903 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Distinct, comparer,
909 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
910 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
911 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
912 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
913 IEnumerable<TSource> second)
915 throw new NotSupportedException ();
918 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
919 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
920 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
921 public static ParallelQuery<TSource> Union<TSource>(this ParallelQuery<TSource> first,
922 IEnumerable<TSource> second,
923 IEqualityComparer<TSource> comparer)
925 throw new NotSupportedException ();
928 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
929 ParallelQuery<TSource> second)
931 return first.Union (second, EqualityComparer<TSource>.Default);
934 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
935 ParallelQuery<TSource> second,
936 IEqualityComparer<TSource> comparer)
939 throw new ArgumentNullException ("first");
941 throw new ArgumentNullException ("second");
942 if (comparer == null)
943 comparer = EqualityComparer<TSource>.Default;
945 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Union, comparer, first.Node, second.Node));
950 public static ParallelQuery<TSource> Take<TSource> (this ParallelQuery<TSource> source, int count)
953 throw new ArgumentNullException ("source");
955 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, count));
958 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
959 Func<TSource, bool> predicate)
962 throw new ArgumentNullException ("source");
963 if (predicate == null)
964 throw new ArgumentNullException ("predicate");
966 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, (e, _) => predicate (e), false));
969 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
970 Func<TSource, int, bool> predicate)
973 throw new ArgumentNullException ("source");
974 if (predicate == null)
975 throw new ArgumentNullException ("predicate");
977 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, predicate, true));
982 public static ParallelQuery<TSource> Skip<TSource> (this ParallelQuery<TSource> source, int count)
985 throw new ArgumentNullException ("source");
987 return source.Node.IsOrdered () ?
988 source.Where ((e, i) => i >= count) :
989 source.Where ((e) => count < 0 || Interlocked.Decrement (ref count) < 0);
993 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
994 Func<TSource, bool> predicate)
997 throw new ArgumentNullException ("source");
998 if (predicate == null)
999 throw new ArgumentNullException ("predicate");
1001 return source.Node.IsOrdered () ?
1002 source.SkipWhile ((e, i) => predicate (e)) :
1003 source.Where ((e) => !predicate (e));
1006 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
1007 Func<TSource, int, bool> predicate)
1010 throw new ArgumentNullException ("source");
1011 if (predicate == null)
1012 throw new ArgumentNullException ("predicate");
1014 int indexCache = int.MaxValue;
1016 return source.Where ((e, i) => i >= indexCache || (!predicate (e, i) && (indexCache = i) == i));
1021 static TSource SingleInternal<TSource> (this ParallelQuery<TSource> source, params TSource[] init)
1023 TSource result = default(TSource);
1024 bool hasValue = false;
1026 foreach (TSource element in source) {
1028 throw new InvalidOperationException ("The input sequence contains more than one element.");
1034 if (!hasValue && init.Length != 0) {
1040 throw new InvalidOperationException ("The input sequence is empty.");
1045 public static TSource Single<TSource> (this ParallelQuery<TSource> source)
1048 throw new ArgumentNullException ("source");
1050 return SingleInternal<TSource> (source);
1053 public static TSource Single<TSource> (this ParallelQuery<TSource> source,
1054 Func<TSource, bool> predicate)
1057 throw new ArgumentNullException ("source");
1058 if (predicate == null)
1059 throw new ArgumentNullException ("predicate");
1061 return source.Where (predicate).Single ();
1064 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source)
1067 throw new ArgumentNullException ("source");
1069 return SingleInternal<TSource> (source, default (TSource));
1072 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1075 throw new ArgumentNullException ("source");
1076 if (predicate == null)
1077 throw new ArgumentNullException ("predicate");
1079 return source.Where (predicate).SingleOrDefault ();
1084 public static int Count<TSource> (this ParallelQuery<TSource> source)
1087 throw new ArgumentNullException ("source");
1089 var helper = new CountAggregateHelper<TSource> ();
1090 return source.Aggregate<TSource, int, int> (helper.Seed,
1091 helper.Intermediate,
1096 public static int Count<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1099 throw new ArgumentNullException ("source");
1100 if (predicate == null)
1101 throw new ArgumentNullException ("predicate");
1103 return source.Where (predicate).Count ();
1106 class CountAggregateHelper<TSource>
1113 public int Intermediate (int acc, TSource e)
1118 public int Reducer (int acc1, int acc2)
1123 public int Final (int acc)
1129 public static long LongCount<TSource> (this ParallelQuery<TSource> source)
1132 throw new ArgumentNullException ("source");
1134 var helper = new LongCountAggregateHelper<TSource> ();
1135 return source.Aggregate<TSource, long, long> (helper.Seed,
1136 helper.Intermediate,
1141 public static long LongCount<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1144 throw new ArgumentNullException ("source");
1145 if (predicate == null)
1146 throw new ArgumentNullException ("predicate");
1148 return source.Where (predicate).LongCount ();
1151 class LongCountAggregateHelper<TSource>
1158 public long Intermediate (long acc, TSource e)
1163 public long Reducer (long acc1, long acc2)
1168 public long Final (long acc)
1176 public static double Average (this ParallelQuery<int> source)
1179 throw new ArgumentNullException ("source");
1181 return source.Aggregate (() => new int[2],
1182 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1183 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1184 (acc) => acc[0] / ((double)acc[1]));
1187 public static double Average (this ParallelQuery<long> source)
1190 throw new ArgumentNullException ("source");
1192 return source.Aggregate (() => new long[2],
1193 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1194 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1195 (acc) => acc[0] / ((double)acc[1]));
1198 public static decimal Average (this ParallelQuery<decimal> source)
1201 throw new ArgumentNullException ("source");
1203 return source.Aggregate (() => new decimal[2],
1204 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1205 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1206 (acc) => acc[0] / acc[1]);
1209 public static double Average (this ParallelQuery<double> source)
1212 throw new ArgumentNullException ("source");
1214 return source.Aggregate (() => new double[2],
1215 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1216 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1217 (acc) => acc[0] / ((double)acc[1]));
1220 public static float Average (this ParallelQuery<float> source)
1223 throw new ArgumentNullException ("source");
1225 return source.Aggregate (() => new float[2],
1226 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1227 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1228 (acc) => acc[0] / acc[1]);
1232 #region More Average
1233 public static double? Average (this ParallelQuery<int?> source)
1236 throw new ArgumentNullException ("source");
1238 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();;
1241 public static double? Average (this ParallelQuery<long?> source)
1244 throw new ArgumentNullException ("source");
1246 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1249 public static decimal? Average (this ParallelQuery<decimal?> source)
1252 throw new ArgumentNullException ("source");
1254 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1257 public static double? Average (this ParallelQuery<double?> source)
1260 throw new ArgumentNullException ("source");
1262 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1265 public static float? Average (this ParallelQuery<float?> source)
1268 throw new ArgumentNullException ("source");
1270 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1273 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1276 throw new ArgumentNullException ("source");
1277 if (selector == null)
1278 throw new ArgumentNullException ("selector");
1280 return source.Select (selector).Average ();
1283 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1286 throw new ArgumentNullException ("source");
1287 if (selector == null)
1288 throw new ArgumentNullException ("selector");
1290 return source.Select (selector).Average ();
1293 public static float Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1296 throw new ArgumentNullException ("source");
1297 if (selector == null)
1298 throw new ArgumentNullException ("selector");
1300 return source.Select (selector).Average ();
1303 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1306 throw new ArgumentNullException ("source");
1307 if (selector == null)
1308 throw new ArgumentNullException ("selector");
1310 return source.Select (selector).Average ();
1313 public static decimal Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1316 throw new ArgumentNullException ("source");
1317 if (selector == null)
1318 throw new ArgumentNullException ("selector");
1320 return source.Select (selector).Average ();
1323 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1326 throw new ArgumentNullException ("source");
1327 if (selector == null)
1328 throw new ArgumentNullException ("selector");
1330 return source.Select (selector).Average ();
1333 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1336 throw new ArgumentNullException ("source");
1337 if (selector == null)
1338 throw new ArgumentNullException ("selector");
1340 return source.Select (selector).Average ();
1343 public static float? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1346 throw new ArgumentNullException ("source");
1347 if (selector == null)
1348 throw new ArgumentNullException ("selector");
1350 return source.Select (selector).Average ();
1353 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1356 throw new ArgumentNullException ("source");
1357 if (selector == null)
1358 throw new ArgumentNullException ("selector");
1360 return source.Select (selector).Average ();
1363 public static decimal? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1366 throw new ArgumentNullException ("source");
1367 if (selector == null)
1368 throw new ArgumentNullException ("selector");
1370 return source.Select (selector).Average ();
1375 public static int Sum (this ParallelQuery<int> source)
1378 throw new ArgumentNullException ("source");
1380 return source.Aggregate (0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1383 public static long Sum (this ParallelQuery<long> source)
1386 throw new ArgumentNullException ("source");
1388 return source.Aggregate ((long)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1391 public static float Sum (this ParallelQuery<float> source)
1394 throw new ArgumentNullException ("source");
1396 return source.Aggregate (0.0f, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1399 public static double Sum (this ParallelQuery<double> source)
1402 throw new ArgumentNullException ("source");
1404 return source.Aggregate (0.0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1407 public static decimal Sum (this ParallelQuery<decimal> source)
1410 throw new ArgumentNullException ("source");
1412 return source.Aggregate ((decimal)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1415 public static int? Sum (this ParallelQuery<int?> source)
1417 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1420 public static long? Sum (this ParallelQuery<long?> source)
1423 throw new ArgumentNullException ("source");
1425 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1428 public static float? Sum (this ParallelQuery<float?> source)
1431 throw new ArgumentNullException ("source");
1433 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1436 public static double? Sum (this ParallelQuery<double?> source)
1439 throw new ArgumentNullException ("source");
1441 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1444 public static decimal? Sum (this ParallelQuery<decimal?> source)
1447 throw new ArgumentNullException ("source");
1449 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1452 public static int Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1455 throw new ArgumentNullException ("source");
1456 if (selector == null)
1457 throw new ArgumentNullException ("selector");
1459 return source.Select (selector).Sum ();
1462 public static long Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1465 throw new ArgumentNullException ("source");
1466 if (selector == null)
1467 throw new ArgumentNullException ("selector");
1469 return source.Select (selector).Sum ();
1472 public static decimal Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1475 throw new ArgumentNullException ("source");
1476 if (selector == null)
1477 throw new ArgumentNullException ("selector");
1479 return source.Select (selector).Sum ();
1482 public static float Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1485 throw new ArgumentNullException ("source");
1486 if (selector == null)
1487 throw new ArgumentNullException ("selector");
1489 return source.Select (selector).Sum ();
1492 public static double Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1495 throw new ArgumentNullException ("source");
1496 if (selector == null)
1497 throw new ArgumentNullException ("selector");
1499 return source.Select (selector).Sum ();
1502 public static int? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1505 throw new ArgumentNullException ("source");
1506 if (selector == null)
1507 throw new ArgumentNullException ("selector");
1509 return source.Select (selector).Sum ();
1512 public static long? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1515 throw new ArgumentNullException ("source");
1516 if (selector == null)
1517 throw new ArgumentNullException ("selector");
1519 return source.Select (selector).Sum ();
1522 public static decimal? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1525 throw new ArgumentNullException ("source");
1526 if (selector == null)
1527 throw new ArgumentNullException ("selector");
1529 return source.Select (selector).Sum ();
1532 public static float? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1535 throw new ArgumentNullException ("source");
1536 if (selector == null)
1537 throw new ArgumentNullException ("selector");
1539 return source.Select (selector).Sum ();
1542 public static double? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1545 throw new ArgumentNullException ("source");
1546 if (selector == null)
1547 throw new ArgumentNullException ("selector");
1549 return source.Select (selector).Sum ();
1554 static T BestOrder<T> (ParallelQuery<T> source, BestOrderComparer<T> bestOrderComparer)
1557 throw new ArgumentNullException ("source");
1559 T best = source.Aggregate (bestOrderComparer.Seed,
1560 bestOrderComparer.Intermediate,
1561 bestOrderComparer.Intermediate,
1562 new Identity<T> ().Apply);
1566 class BestOrderComparer<T>
1568 IComparer<T> comparer;
1572 public BestOrderComparer (IComparer<T> comparer, int inverter, T seed)
1574 this.comparer = comparer;
1575 this.inverter = inverter;
1584 public T Intermediate (T first, T second)
1586 return Better (first, second) ? first : second;
1589 bool Better (T first, T second)
1591 return (inverter * comparer.Compare (first, second)) > 0;
1595 public static int Min (this ParallelQuery<int> source)
1597 return BestOrder (source, new BestOrderComparer<int> (Comparer<int>.Default, -1, int.MaxValue));
1600 public static long Min (this ParallelQuery<long> source)
1602 return BestOrder (source, new BestOrderComparer<long> (Comparer<long>.Default, -1, long.MaxValue));
1605 public static float Min (this ParallelQuery<float> source)
1607 return BestOrder (source, new BestOrderComparer<float> (Comparer<float>.Default, -1, float.MaxValue));
1610 public static double Min (this ParallelQuery<double> source)
1612 return BestOrder (source, new BestOrderComparer<double> (Comparer<double>.Default, -1, double.MaxValue));
1615 public static decimal Min (this ParallelQuery<decimal> source)
1617 return BestOrder (source, new BestOrderComparer<decimal> (Comparer<decimal>.Default, -1, decimal.MaxValue));
1620 public static TSource Min<TSource> (this ParallelQuery<TSource> source)
1622 IComparer<TSource> comparer = Comparer<TSource>.Default;
1623 return BestOrder (source, new BestOrderComparer<TSource> (comparer, -1, default (TSource)));
1626 public static TResult Min<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
1629 throw new ArgumentNullException ("source");
1630 if (selector == null)
1631 throw new ArgumentNullException ("selector");
1633 return source.Select (selector).Min ();
1636 public static int? Min (this ParallelQuery<int?> source)
1639 throw new ArgumentNullException ("source");
1641 return source.Select ((e) => e.HasValue ? e.Value : int.MaxValue).Min ();
1644 public static long? Min (this ParallelQuery<long?> source)
1647 throw new ArgumentNullException ("source");
1649 return source.Select ((e) => e.HasValue ? e.Value : long.MaxValue).Min ();
1652 public static float? Min (this ParallelQuery<float?> source)
1655 throw new ArgumentNullException ("source");
1657 return source.Select ((e) => e.HasValue ? e.Value : float.MaxValue).Min ();
1660 public static double? Min (this ParallelQuery<double?> source)
1663 throw new ArgumentNullException ("source");
1665 return source.Select ((e) => e.HasValue ? e.Value : double.MaxValue).Min ();
1668 public static decimal? Min (this ParallelQuery<decimal?> source)
1671 throw new ArgumentNullException ("source");
1673 return source.Select ((e) => e.HasValue ? e.Value : decimal.MaxValue).Min ();
1676 public static int Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1679 throw new ArgumentNullException ("source");
1680 if (selector == null)
1681 throw new ArgumentNullException ("selector");
1683 return source.Select (selector).Min ();
1686 public static long Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1689 throw new ArgumentNullException ("source");
1690 if (selector == null)
1691 throw new ArgumentNullException ("selector");
1693 return source.Select (selector).Min ();
1696 public static float Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1699 throw new ArgumentNullException ("source");
1700 if (selector == null)
1701 throw new ArgumentNullException ("selector");
1703 return source.Select (selector).Min ();
1706 public static double Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1709 throw new ArgumentNullException ("source");
1710 if (selector == null)
1711 throw new ArgumentNullException ("selector");
1713 return source.Select (selector).Min ();
1716 public static decimal Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1719 throw new ArgumentNullException ("source");
1720 if (selector == null)
1721 throw new ArgumentNullException ("selector");
1723 return source.Select (selector).Min ();
1726 public static int? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1729 throw new ArgumentNullException ("source");
1730 if (selector == null)
1731 throw new ArgumentNullException ("selector");
1733 return source.Select (selector).Min ();
1736 public static long? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1739 throw new ArgumentNullException ("source");
1740 if (selector == null)
1741 throw new ArgumentNullException ("selector");
1743 return source.Select (selector).Min ();
1746 public static float? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1749 throw new ArgumentNullException ("source");
1750 if (selector == null)
1751 throw new ArgumentNullException ("selector");
1753 return source.Select (selector).Min ();
1756 public static double? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1759 throw new ArgumentNullException ("source");
1760 if (selector == null)
1761 throw new ArgumentNullException ("selector");
1763 return source.Select (selector).Min ();
1766 public static decimal? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1769 throw new ArgumentNullException ("source");
1770 if (selector == null)
1771 throw new ArgumentNullException ("selector");
1773 return source.Select (selector).Min ();
1776 public static int Max (this ParallelQuery<int> source)
1778 return BestOrder (source, new BestOrderComparer<int> (Comparer<int>.Default, 1, int.MinValue));
1781 public static long Max (this ParallelQuery<long> source)
1783 return BestOrder (source, new BestOrderComparer<long> (Comparer<long>.Default, 1, long.MinValue));
1786 public static float Max (this ParallelQuery<float> source)
1788 return BestOrder (source, new BestOrderComparer<float> (Comparer<float>.Default, 1, float.MinValue));
1791 public static double Max (this ParallelQuery<double> source)
1793 return BestOrder (source, new BestOrderComparer<double> (Comparer<double>.Default, 1, double.MinValue));
1796 public static decimal Max (this ParallelQuery<decimal> source)
1798 return BestOrder (source, new BestOrderComparer<decimal> (Comparer<decimal>.Default, 1, decimal.MinValue));
1801 public static TSource Max<TSource> (this ParallelQuery<TSource> source)
1803 IComparer<TSource> comparer = Comparer<TSource>.Default;
1804 return BestOrder (source, new BestOrderComparer<TSource> (comparer, 1, default (TSource)));
1807 public static TResult Max<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
1810 throw new ArgumentNullException ("source");
1811 if (selector == null)
1812 throw new ArgumentNullException ("selector");
1814 return source.Select (selector).Max ();
1817 public static int? Max (this ParallelQuery<int?> source)
1820 throw new ArgumentNullException ("source");
1822 return source.Select ((e) => e.HasValue ? e.Value : int.MinValue).Max ();
1825 public static long? Max (this ParallelQuery<long?> source)
1828 throw new ArgumentNullException ("source");
1830 return source.Select ((e) => e.HasValue ? e.Value : long.MinValue).Max ();
1833 public static float? Max (this ParallelQuery<float?> source)
1836 throw new ArgumentNullException ("source");
1838 return source.Select ((e) => e.HasValue ? e.Value : float.MinValue).Max ();
1841 public static double? Max (this ParallelQuery<double?> source)
1844 throw new ArgumentNullException ("source");
1846 return source.Select ((e) => e.HasValue ? e.Value : double.MinValue).Max ();
1849 public static decimal? Max (this ParallelQuery<decimal?> source)
1852 throw new ArgumentNullException ("source");
1854 return source.Select ((e) => e.HasValue ? e.Value : decimal.MinValue).Max ();
1857 public static int Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1860 throw new ArgumentNullException ("source");
1861 if (selector == null)
1862 throw new ArgumentNullException ("selector");
1864 return source.Select (selector).Max ();
1867 public static long Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1870 throw new ArgumentNullException ("source");
1871 if (selector == null)
1872 throw new ArgumentNullException ("selector");
1874 return source.Select (selector).Max ();
1877 public static float Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1880 throw new ArgumentNullException ("source");
1881 if (selector == null)
1882 throw new ArgumentNullException ("selector");
1884 return source.Select (selector).Max ();
1887 public static double Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1890 throw new ArgumentNullException ("source");
1891 if (selector == null)
1892 throw new ArgumentNullException ("selector");
1894 return source.Select (selector).Max ();
1897 public static decimal Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1900 throw new ArgumentNullException ("source");
1901 if (selector == null)
1902 throw new ArgumentNullException ("selector");
1904 return source.Select (selector).Max ();
1907 public static int? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1910 throw new ArgumentNullException ("source");
1911 if (selector == null)
1912 throw new ArgumentNullException ("selector");
1914 return source.Select (selector).Max ();
1917 public static long? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1920 throw new ArgumentNullException ("source");
1921 if (selector == null)
1922 throw new ArgumentNullException ("selector");
1924 return source.Select (selector).Max ();
1927 public static float? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1930 throw new ArgumentNullException ("source");
1931 if (selector == null)
1932 throw new ArgumentNullException ("selector");
1934 return source.Select (selector).Max ();
1937 public static double? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1940 throw new ArgumentNullException ("source");
1941 if (selector == null)
1942 throw new ArgumentNullException ("selector");
1944 return source.Select (selector).Max ();
1947 public static decimal? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1950 throw new ArgumentNullException ("source");
1951 if (selector == null)
1952 throw new ArgumentNullException ("selector");
1954 return source.Select (selector).Max ();
1958 #region Cast / OfType
1959 public static ParallelQuery<TResult> Cast<TResult> (this ParallelQuery source)
1962 throw new ArgumentNullException ("source");
1964 return source.TypedQuery.Select ((e) => (TResult)e);
1967 public static ParallelQuery<TResult> OfType<TResult> (this ParallelQuery source)
1970 throw new ArgumentNullException ("source");
1972 return source.TypedQuery.Where ((e) => e is TResult).Cast<TResult> ();
1977 public static ParallelQuery<TSource> Reverse<TSource> (this ParallelQuery<TSource> source)
1980 throw new ArgumentNullException ("source");
1982 return new ParallelQuery<TSource> (new QueryReverseNode<TSource> (source));
1986 #region ToArray - ToList - ToDictionary - ToLookup
1987 public static List<TSource> ToList<TSource> (this ParallelQuery<TSource> source)
1990 throw new ArgumentNullException ("source");
1992 if (source.Node.IsOrdered ())
1993 return ToListOrdered (source);
1995 var helper = new ListAggregateHelper<TSource> ();
1996 List<TSource> temp = source.Aggregate (helper.Seed,
1997 helper.Intermediate,
2003 class ListAggregateHelper<TSource>
2005 public List<TSource> Seed ()
2007 return new List<TSource> (50);
2010 public List<TSource> Intermediate (List<TSource> list, TSource e)
2016 public List<TSource> Reducer (List<TSource> list, List<TSource> list2)
2018 list.AddRange (list2);
2022 public List<TSource> Final (List<TSource> list)
2028 internal static List<TSource> ToListOrdered<TSource> (this ParallelQuery<TSource> source)
2030 List<TSource> result = new List<TSource> ();
2032 foreach (TSource element in source)
2033 result.Add (element);
2038 public static TSource[] ToArray<TSource> (this ParallelQuery<TSource> source)
2041 throw new ArgumentNullException ("source");
2043 if (source.Node.IsOrdered ())
2044 return ToListOrdered (source).ToArray ();
2046 var helper = new ArrayAggregateHelper<TSource> ();
2047 ParallelExecuter.ProcessAndAggregate<TSource, List<TSource>> (source.Node,
2049 helper.Intermediate,
2052 return helper.Result;
2055 class ArrayAggregateHelper<TSource>
2059 public TSource[] Result {
2065 internal List<TSource> Seed ()
2067 return new List<TSource> ();
2070 internal List<TSource> Intermediate (List<TSource> list, TSource e)
2076 internal void Final (IList<List<TSource>> list)
2080 for (int i = 0; i < list.Count; i++)
2081 count += list[i].Count;
2083 result = new TSource[count];
2084 int insertIndex = -1;
2086 for (int i = 0; i < list.Count; i++)
2087 for (int j = 0; j < list[i].Count; j++)
2088 result [++insertIndex] = list[i][j];
2092 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
2093 Func<TSource, TKey> keySelector,
2094 IEqualityComparer<TKey> comparer)
2096 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
2099 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
2100 Func<TSource, TKey> keySelector)
2102 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
2105 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2106 Func<TSource, TKey> keySelector,
2107 Func<TSource, TElement> elementSelector)
2109 return ToDictionary<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
2112 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2113 Func<TSource, TKey> keySelector,
2114 Func<TSource, TElement> elementSelector,
2115 IEqualityComparer<TKey> comparer)
2118 throw new ArgumentNullException ("source");
2119 if (keySelector == null)
2120 throw new ArgumentNullException ("keySelector");
2121 if (comparer == null)
2122 comparer = EqualityComparer<TKey>.Default;
2123 if (elementSelector == null)
2124 throw new ArgumentNullException ("elementSelector");
2126 var helper = new DictionaryAggregateHelper<TSource, TKey, TElement> (comparer, keySelector, elementSelector);
2127 return source.Aggregate (helper.Seed,
2128 helper.Intermediate,
2133 class DictionaryAggregateHelper<TSource, TKey, TElement>
2135 IEqualityComparer<TKey> comparer;
2136 Func<TSource, TKey> keySelector;
2137 Func<TSource, TElement> elementSelector;
2139 public DictionaryAggregateHelper (IEqualityComparer<TKey> comparer,
2140 Func<TSource, TKey> keySelector,
2141 Func<TSource, TElement> elementSelector)
2143 this.comparer = comparer;
2144 this.keySelector = keySelector;
2145 this.elementSelector = elementSelector;
2148 public Dictionary<TKey, TElement> Seed ()
2150 return new Dictionary<TKey, TElement> (comparer);
2153 public Dictionary<TKey, TElement> Intermediate (Dictionary<TKey, TElement> d, TSource e)
2155 d.Add (keySelector (e), elementSelector (e));
2159 public Dictionary<TKey, TElement> Reducer (Dictionary<TKey, TElement> d1, Dictionary<TKey, TElement> d2)
2161 foreach (var couple in d2) d1.Add (couple.Key, couple.Value);
2165 public Dictionary<TKey, TElement> Final (Dictionary<TKey, TElement> d)
2171 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
2172 Func<TSource, TKey> keySelector)
2174 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
2177 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
2178 Func<TSource, TKey> keySelector,
2179 IEqualityComparer<TKey> comparer)
2181 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
2184 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2185 Func<TSource, TKey> keySelector,
2186 Func<TSource, TElement> elementSelector)
2188 return ToLookup<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
2191 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2192 Func<TSource, TKey> keySelector,
2193 Func<TSource, TElement> elementSelector,
2194 IEqualityComparer<TKey> comparer)
2197 throw new ArgumentNullException ("source");
2198 if (keySelector == null)
2199 throw new ArgumentNullException ("keySelector");
2200 if (comparer == null)
2201 comparer = EqualityComparer<TKey>.Default;
2202 if (elementSelector == null)
2203 throw new ArgumentNullException ("elementSelector");
2205 ConcurrentLookup<TKey, TElement> lookup = new ConcurrentLookup<TKey, TElement> (comparer);
2206 source.ForAll ((e) => lookup.Add (keySelector (e), elementSelector (e)));
2213 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather than "
2214 + "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
2215 + "to convert the right data source to System.Linq.ParallelQuery<T>.")]
2216 public static ParallelQuery<TSource> Concat<TSource>(this ParallelQuery<TSource> first,
2217 IEnumerable<TSource> second)
2219 throw new NotSupportedException ();
2222 public static ParallelQuery<TSource> Concat<TSource> (this ParallelQuery<TSource> first, ParallelQuery<TSource> second)
2224 return new ParallelQuery<TSource> (new QueryConcatNode<TSource> (first.Node, second.Node));
2228 #region DefaultIfEmpty
2229 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source)
2231 return source.DefaultIfEmpty (default (TSource));
2234 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source, TSource defaultValue)
2236 return new ParallelQuery<TSource> (new QueryDefaultEmptyNode<TSource> (source.Node, defaultValue));
2241 public static TSource First<TSource> (this ParallelQuery<TSource> source)
2243 CancellationTokenSource src = new CancellationTokenSource ();
2244 IEnumerator<TSource> enumerator = source.WithImplementerToken (src).GetEnumerator ();
2246 if (enumerator == null || !enumerator.MoveNext ())
2247 throw new InvalidOperationException ("source contains no element");
2249 TSource result = enumerator.Current;
2251 enumerator.Dispose ();
2256 public static TSource First<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2258 return source.Where (predicate).First ();
2261 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source)
2263 return source.DefaultIfEmpty ().First ();
2266 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2268 return source.Where (predicate).FirstOrDefault ();
2273 public static TSource Last<TSource> (this ParallelQuery<TSource> source)
2275 return source.Reverse ().First ();
2278 public static TSource Last<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2280 return source.Reverse ().First (predicate);
2283 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source)
2285 return source.Reverse ().FirstOrDefault ();
2288 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2290 return source.Reverse ().FirstOrDefault (predicate);
2295 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2296 ParallelQuery<TSecond> second,
2297 Func<TFirst, TSecond, TResult> resultSelector)
2300 throw new ArgumentNullException ("first");
2302 throw new ArgumentNullException ("second");
2303 if (resultSelector == null)
2304 throw new ArgumentNullException ("resultSelector");
2306 return new ParallelQuery<TResult> (new QueryZipNode<TFirst, TSecond, TResult> (resultSelector, first.Node, second.Node));
2309 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
2310 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
2311 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
2312 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2313 IEnumerable<TSecond> second,
2314 Func<TFirst, TSecond, TResult> resultSelector)
2316 throw new NotSupportedException ();
2324 public T Apply (T input)