2 // ParallelEnumerable.cs
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33 using System.Linq.Parallel;
34 using System.Linq.Parallel.QueryNodes;
38 public static class ParallelEnumerable
40 #region Range & Repeat
41 public static ParallelQuery<int> Range (int start, int count)
43 if (int.MaxValue - start < count - 1)
44 throw new ArgumentOutOfRangeException ("count", "start + count - 1 is larger than Int32.MaxValue");
46 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
48 return (new RangeList (start, count)).AsParallel ();
51 public static ParallelQuery<TResult> Repeat<TResult> (TResult element, int count)
54 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
56 return (new RepeatList<TResult> (element, count)).AsParallel ();
61 public static ParallelQuery<TResult> Empty<TResult> ()
63 return Repeat<TResult> (default (TResult), 0);
68 public static ParallelQuery<TSource> AsParallel<TSource> (this IEnumerable<TSource> source)
71 throw new ArgumentNullException ("source");
73 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
74 * already a ParallelQuery, just cast it
76 ParallelQuery<TSource> query = source as ParallelQuery<TSource>;
78 return query ?? new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
81 public static ParallelQuery<TSource> AsParallel<TSource> (this Partitioner<TSource> source)
84 throw new ArgumentNullException ("source");
86 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
87 * already a ParallelQuery, just cast it
89 ParallelQuery<TSource> query = source as ParallelQuery<TSource>;
91 return query ?? new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
94 public static ParallelQuery AsParallel (this IEnumerable source)
97 throw new ArgumentNullException ("source");
99 /* Someone might be trying to use AsParallel a bit too much, if the query was in fact
100 * already a ParallelQuery, just cast it
102 ParallelQuery query = source as ParallelQuery;
104 return query ?? new ParallelQuery<object> (new QueryStartNode<object> (source.Cast<object> ()));
107 public static IEnumerable<TSource> AsEnumerable<TSource> (this ParallelQuery<TSource> source)
110 throw new ArgumentNullException ("source");
112 return source.AsSequential ();
115 public static IEnumerable<TSource> AsSequential<TSource> (this ParallelQuery<TSource> source)
118 throw new ArgumentNullException ("source");
120 return source.Node.GetSequential ();
124 #region AsOrdered / AsUnordered
125 public static ParallelQuery<TSource> AsOrdered<TSource> (this ParallelQuery<TSource> source)
128 throw new ArgumentNullException ("source");
130 return new ParallelQuery<TSource> (new QueryAsOrderedNode<TSource> (source.Node));
133 public static ParallelQuery<TSource> AsUnordered<TSource> (this ParallelQuery<TSource> source)
136 throw new ArgumentNullException ("source");
138 return new ParallelQuery<TSource> (new QueryAsUnorderedNode<TSource> (source.Node));
141 public static ParallelQuery AsOrdered (this ParallelQuery source)
144 throw new ArgumentNullException ("source");
146 return source.TypedQuery.AsOrdered ();
151 public static ParallelQuery<TSource> WithExecutionMode<TSource> (this ParallelQuery<TSource> source,
152 ParallelExecutionMode executionMode)
155 throw new ArgumentNullException ("source");
157 return new ParallelQuery<TSource> (new ParallelExecutionModeNode<TSource> (executionMode, source.Node));
160 public static ParallelQuery<TSource> WithCancellation<TSource> (this ParallelQuery<TSource> source,
161 CancellationToken cancellationToken)
164 throw new ArgumentNullException ("source");
166 return new ParallelQuery<TSource> (new CancellationTokenNode<TSource> (cancellationToken, source.Node));
169 public static ParallelQuery<TSource> WithMergeOptions<TSource> (this ParallelQuery<TSource> source,
170 ParallelMergeOptions mergeOptions)
173 throw new ArgumentNullException ("source");
175 return new ParallelQuery<TSource> (new ParallelMergeOptionsNode<TSource> (mergeOptions, source.Node));
178 public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource> (this ParallelQuery<TSource> source,
179 int degreeOfParallelism)
181 if (degreeOfParallelism < 1 || degreeOfParallelism > 63)
182 throw new ArgumentException ("degreeOfParallelism is less than 1 or greater than 63", "degreeOfParallelism");
184 throw new ArgumentNullException ("source");
186 return new ParallelQuery<TSource> (new DegreeOfParallelismNode<TSource> (degreeOfParallelism, source.Node));
189 internal static ParallelQuery<TSource> WithImplementerToken<TSource> (this ParallelQuery<TSource> source,
190 CancellationTokenSource token)
192 return new ParallelQuery<TSource> (new ImplementerTokenNode<TSource> (token, source.Node));
197 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
200 throw new ArgumentNullException ("source");
201 if (selector == null)
202 throw new ArgumentNullException ("selector");
204 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
207 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, int, TResult> selector)
210 throw new ArgumentNullException ("source");
211 if (selector == null)
212 throw new ArgumentNullException ("selector");
214 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
219 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
220 Func<TSource, IEnumerable<TResult>> selector)
222 return source.SelectMany (selector, (s, e) => e);
225 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
226 Func<TSource, int, IEnumerable<TResult>> selector)
228 return source.SelectMany (selector, (s, e) => e);
231 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
232 Func<TSource, IEnumerable<TCollection>> collectionSelector,
233 Func<TSource, TCollection, TResult> resultSelector)
235 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
240 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
241 Func<TSource, int, IEnumerable<TCollection>> collectionSelector,
242 Func<TSource, TCollection, TResult> resultSelector)
244 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
251 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
254 throw new ArgumentNullException ("source");
255 if (predicate == null)
256 throw new ArgumentNullException ("predicate");
258 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
261 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, int, bool> predicate)
264 throw new ArgumentNullException ("source");
265 if (predicate == null)
266 throw new ArgumentNullException ("predicate");
268 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
273 public static TSource Aggregate<TSource> (this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)
276 throw new ArgumentNullException ("source");
278 throw new ArgumentNullException ("func");
280 return source.Aggregate<TSource, TSource, TSource> ((Func<TSource>)null,
286 public static TAccumulate Aggregate<TSource, TAccumulate> (this ParallelQuery<TSource> source,
288 Func<TAccumulate, TSource, TAccumulate> func)
291 throw new ArgumentNullException ("source");
293 throw new ArgumentNullException ("func");
295 return source.Aggregate (seed, func, (e) => e);
298 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
300 Func<TAccumulate, TSource, TAccumulate> func,
301 Func<TAccumulate, TResult> resultSelector)
304 throw new ArgumentNullException ("source");
306 throw new ArgumentNullException ("func");
307 if (resultSelector == null)
308 throw new ArgumentNullException ("resultSelector");
310 TAccumulate accumulator = seed;
312 foreach (TSource value in source)
313 accumulator = func (accumulator, value);
315 return resultSelector (accumulator);
318 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
320 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
321 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
322 Func<TAccumulate, TResult> resultSelector)
325 throw new ArgumentNullException ("source");
326 if (updateAccumulatorFunc == null)
327 throw new ArgumentNullException ("updateAccumulatorFunc");
328 if (combineAccumulatorsFunc == null)
329 throw new ArgumentNullException ("combineAccumulatorsFunc");
330 if (resultSelector == null)
331 throw new ArgumentNullException ("resultSelector");
333 return source.Aggregate (() => seed, updateAccumulatorFunc, combineAccumulatorsFunc, resultSelector);
336 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
337 Func<TAccumulate> seedFactory,
338 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
339 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
340 Func<TAccumulate, TResult> resultSelector)
343 throw new ArgumentNullException ("source");
344 if (seedFactory == null)
345 throw new ArgumentNullException ("seedFactory");
346 if (updateAccumulatorFunc == null)
347 throw new ArgumentNullException ("updateAccumulatorFunc");
348 if (combineAccumulatorsFunc == null)
349 throw new ArgumentNullException ("combineAccumulatorsFunc");
350 if (resultSelector == null)
351 throw new ArgumentNullException ("resultSelector");
353 TAccumulate accumulator = default (TAccumulate);
355 ParallelExecuter.ProcessAndAggregate<TSource, TAccumulate> (source.Node, seedFactory, updateAccumulatorFunc, (list) => {
356 accumulator = list [0];
357 for (int i = 1; i < list.Count; i++)
358 accumulator = combineAccumulatorsFunc (accumulator, list[i]);
361 return resultSelector (accumulator);;
366 public static void ForAll<TSource> (this ParallelQuery<TSource> source, Action<TSource> action)
369 throw new ArgumentNullException ("source");
371 throw new ArgumentNullException ("action");
373 ParallelExecuter.ProcessAndBlock (source.Node, (e, c) => action (e));
378 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
379 Func<TSource, TKey> keySelector,
380 IComparer<TKey> comparer)
383 throw new ArgumentNullException ("source");
384 if (keySelector == null)
385 throw new ArgumentNullException ("keySelector");
386 if (comparer == null)
387 comparer = Comparer<TKey>.Default;
389 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
391 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
394 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
395 Func<TSource, TKey> keySelector)
397 return OrderByDescending (source, keySelector, Comparer<TKey>.Default);
400 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
401 Func<TSource, TKey> keySelector)
403 return OrderBy (source, keySelector, Comparer<TKey>.Default);
406 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
407 Func<TSource, TKey> keySelector,
408 IComparer<TKey> comparer)
411 throw new ArgumentNullException ("source");
412 if (keySelector == null)
413 throw new ArgumentNullException ("keySelector");
414 if (comparer == null)
415 comparer = Comparer<TKey>.Default;
417 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
419 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
424 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
425 Func<TSource, TKey> keySelector)
427 return ThenBy (source, keySelector, Comparer<TKey>.Default);
430 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
431 Func<TSource, TKey> keySelector,
432 IComparer<TKey> comparer)
435 throw new ArgumentNullException ("source");
436 if (keySelector == null)
437 throw new ArgumentNullException ("keySelector");
438 if (comparer == null)
439 comparer = Comparer<TKey>.Default;
441 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
443 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
446 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
447 Func<TSource, TKey> keySelector)
449 return ThenByDescending (source, keySelector, Comparer<TKey>.Default);
452 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
453 Func<TSource, TKey> keySelector,
454 IComparer<TKey> comparer)
457 throw new ArgumentNullException ("source");
458 if (keySelector == null)
459 throw new ArgumentNullException ("keySelector");
460 if (comparer == null)
461 comparer = Comparer<TKey>.Default;
463 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
465 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
470 public static bool All<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
473 throw new ArgumentNullException ("source");
474 if (predicate == null)
475 throw new ArgumentNullException ("predicate");
477 CancellationTokenSource src = new CancellationTokenSource ();
478 ParallelQuery<TSource> innerQuery = source.WithImplementerToken (src);
482 innerQuery.ForAll ((e) => {
483 if (!predicate (e)) {
488 } catch (OperationCanceledException e) {
489 if (e.CancellationToken != src.Token)
498 public static bool Any<TSource> (this ParallelQuery<TSource> source)
500 return Any<TSource> (source, (_) => true);
503 public static bool Any<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
506 throw new ArgumentNullException ("source");
507 if (predicate == null)
508 throw new ArgumentNullException ("predicate");
510 return !source.All ((e) => !predicate (e));
515 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value)
517 return Contains<TSource> (source, value, EqualityComparer<TSource>.Default);
520 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
523 throw new ArgumentNullException ("source");
524 if (comparer == null)
525 comparer = EqualityComparer<TSource>.Default;
527 return Any<TSource> (source, (e) => comparer.Equals (value));
531 #region SequenceEqual
532 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
533 ParallelQuery<TSource> second)
536 throw new ArgumentNullException ("first");
538 throw new ArgumentNullException ("second");
540 return first.SequenceEqual (second, EqualityComparer<TSource>.Default);
543 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
544 ParallelQuery<TSource> second,
545 IEqualityComparer<TSource> comparer)
548 throw new ArgumentNullException ("first");
550 throw new ArgumentNullException ("second");
551 if (comparer == null)
552 comparer = EqualityComparer<TSource>.Default;
554 var source = new CancellationTokenSource ();
555 var zip = new QueryZipNode<TSource, TSource, bool> (comparer.Equals, first.Node, second.Node) { Strict = true };
556 var innerQuery = new ParallelQuery<bool> (zip).WithImplementerToken (source);
561 innerQuery.ForAll (value => {
567 } catch (AggregateException ex) {
568 if (ex.InnerException is QueryZipException)
572 } catch (OperationCanceledException e) {
573 if (e.CancellationToken != source.Token)
580 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
581 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
582 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
583 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
585 throw new NotSupportedException ();
588 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
589 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
590 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
591 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
592 IEnumerable<TSource> second,
593 IEqualityComparer<TSource> comparer)
595 throw new NotSupportedException ();
601 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
602 Func<TSource, TKey> keySelector)
604 return source.GroupBy (keySelector, EqualityComparer<TKey>.Default);
607 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
608 Func<TSource, TKey> keySelector,
609 IEqualityComparer<TKey> comparer)
611 return source.GroupBy (keySelector, new Identity<TSource> ().Apply, comparer);
614 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
615 Func<TSource, TKey> keySelector,
616 Func<TSource, TElement> elementSelector)
618 return source.GroupBy (keySelector, elementSelector, EqualityComparer<TKey>.Default);
621 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
622 Func<TSource, TKey> keySelector,
623 Func<TKey, IEnumerable<TSource>, TResult> resultSelector)
625 return source.GroupBy (keySelector)
626 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
629 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
630 Func<TSource, TKey> keySelector,
631 Func<TSource, TElement> elementSelector,
632 IEqualityComparer<TKey> comparer)
635 throw new ArgumentNullException ("source");
636 if (keySelector == null)
637 throw new ArgumentNullException ("keySelector");
638 if (elementSelector == null)
639 throw new ArgumentNullException ("elementSelector");
640 if (comparer == null)
641 comparer = EqualityComparer<TKey>.Default;
643 return new ParallelQuery<IGrouping<TKey, TElement>> (new QueryGroupByNode<TSource, TKey, TElement> (source.Node, keySelector, elementSelector, comparer));
646 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
647 Func<TSource, TKey> keySelector,
648 Func<TSource, TElement> elementSelector,
649 Func<TKey, IEnumerable<TElement>, TResult> resultSelector)
651 return source.GroupBy (keySelector, elementSelector)
652 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
655 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
656 Func<TSource, TKey> keySelector,
657 Func<TKey, IEnumerable<TSource>, TResult> resultSelector,
658 IEqualityComparer<TKey> comparer)
660 return source.GroupBy (keySelector, comparer)
661 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
664 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
665 Func<TSource, TKey> keySelector,
666 Func<TSource, TElement> elementSelector,
667 Func<TKey, IEnumerable<TElement>, TResult> resultSelector,
668 IEqualityComparer<TKey> comparer)
670 return source.GroupBy (keySelector, elementSelector, comparer)
671 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
676 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
677 ParallelQuery<TInner> inner,
678 Func<TOuter, TKey> outerKeySelector,
679 Func<TInner, TKey> innerKeySelector,
680 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
682 return outer.GroupJoin (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
685 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
686 ParallelQuery<TInner> inner,
687 Func<TOuter, TKey> outerKeySelector,
688 Func<TInner, TKey> innerKeySelector,
689 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
690 IEqualityComparer<TKey> comparer)
692 return outer.Join (inner.GroupBy (innerKeySelector, (e) => e), outerKeySelector, (e) => e.Key, (e1, e2) => resultSelector (e1, e2), comparer);
695 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
696 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
697 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
698 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
699 IEnumerable<TInner> inner,
700 Func<TOuter, TKey> outerKeySelector,
701 Func<TInner, TKey> innerKeySelector,
702 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
704 throw new NotSupportedException ();
707 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
708 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
709 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
710 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
711 IEnumerable<TInner> inner,
712 Func<TOuter, TKey> outerKeySelector,
713 Func<TInner, TKey> innerKeySelector,
714 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
715 IEqualityComparer<TKey> comparer)
717 throw new NotSupportedException ();
722 public static TSource ElementAt<TSource> (this ParallelQuery<TSource> source, int index)
725 throw new ArgumentNullException ("source");
727 throw new ArgumentOutOfRangeException ("index");
730 return source.First ();
731 } catch (InvalidOperationException) {
732 throw new ArgumentOutOfRangeException ("index");
736 TSource result = default (TSource);
738 ParallelQuery<TSource> innerQuery = source.Where ((e, i) => i == index);
741 result = innerQuery.First ();
742 } catch (InvalidOperationException) {
743 throw new ArgumentOutOfRangeException ("index");
749 public static TSource ElementAtOrDefault<TSource> (this ParallelQuery<TSource> source, int index)
752 throw new ArgumentNullException ("source");
755 return source.ElementAt (index);
756 } catch (ArgumentOutOfRangeException) {
757 return default (TSource);
763 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
764 ParallelQuery<TSource> second)
766 return Intersect<TSource> (first, second, EqualityComparer<TSource>.Default);
769 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
770 ParallelQuery<TSource> second,
771 IEqualityComparer<TSource> comparer)
774 throw new ArgumentNullException ("first");
776 throw new ArgumentNullException ("second");
777 if (comparer == null)
778 comparer = EqualityComparer<TSource>.Default;
780 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Intersect, comparer, first.Node, second.Node));
783 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
784 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
785 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
786 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
788 throw new NotSupportedException ();
791 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
792 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
793 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
794 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
795 IEnumerable<TSource> second,
796 IEqualityComparer<TSource> comparer)
798 throw new NotSupportedException ();
803 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
804 ParallelQuery<TInner> inner,
805 Func<TOuter, TKey> outerKeySelector,
806 Func<TInner, TKey> innerKeySelector,
807 Func<TOuter, TInner, TResult> resultSelector)
809 return outer.Join (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
812 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
813 ParallelQuery<TInner> inner,
814 Func<TOuter, TKey> outerKeySelector,
815 Func<TInner, TKey> innerKeySelector,
816 Func<TOuter, TInner, TResult> resultSelector,
817 IEqualityComparer<TKey> comparer)
819 return new ParallelQuery<TResult> (new QueryJoinNode<TOuter, TInner, TKey, TResult> (outer.Node, inner.Node, outerKeySelector, innerKeySelector, resultSelector, comparer));
822 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
823 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
824 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
825 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
826 IEnumerable<TInner> inner,
827 Func<TOuter, TKey> outerKeySelector,
828 Func<TInner, TKey> innerKeySelector,
829 Func<TOuter, TInner, TResult> resultSelector)
831 throw new NotSupportedException ();
834 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
835 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
836 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
837 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
838 IEnumerable<TInner> inner,
839 Func<TOuter, TKey> outerKeySelector,
840 Func<TInner, TKey> innerKeySelector,
841 Func<TOuter, TInner, TResult> resultSelector,
842 IEqualityComparer<TKey> comparer)
844 throw new NotSupportedException ();
849 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
850 ParallelQuery<TSource> second)
852 return Except<TSource> (first, second, EqualityComparer<TSource>.Default);
855 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
856 ParallelQuery<TSource> second,
857 IEqualityComparer<TSource> comparer)
860 throw new ArgumentNullException ("first");
862 throw new ArgumentNullException ("second");
863 if (comparer == null)
864 comparer = EqualityComparer<TSource>.Default;
866 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Except,
867 comparer, first.Node, second.Node));
870 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
871 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
872 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
873 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
874 IEnumerable<TSource> second)
876 throw new NotSupportedException ();
879 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
880 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
881 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
882 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
883 IEnumerable<TSource> second,
884 IEqualityComparer<TSource> comparer)
886 throw new NotSupportedException ();
891 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source)
893 return Distinct<TSource> (source, EqualityComparer<TSource>.Default);
896 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source, IEqualityComparer<TSource> comparer)
899 throw new ArgumentNullException ("source");
900 if (comparer == null)
901 comparer = EqualityComparer<TSource>.Default;
903 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Distinct, comparer,
909 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
910 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
911 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
912 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
913 IEnumerable<TSource> second)
915 throw new NotSupportedException ();
918 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
919 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
920 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
921 public static ParallelQuery<TSource> Union<TSource>(this ParallelQuery<TSource> first,
922 IEnumerable<TSource> second,
923 IEqualityComparer<TSource> comparer)
925 throw new NotSupportedException ();
928 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
929 ParallelQuery<TSource> second)
931 return first.Union (second, EqualityComparer<TSource>.Default);
934 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
935 ParallelQuery<TSource> second,
936 IEqualityComparer<TSource> comparer)
939 throw new ArgumentNullException ("first");
941 throw new ArgumentNullException ("second");
942 if (comparer == null)
943 comparer = EqualityComparer<TSource>.Default;
945 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Union, comparer, first.Node, second.Node));
950 public static ParallelQuery<TSource> Take<TSource> (this ParallelQuery<TSource> source, int count)
953 throw new ArgumentNullException ("source");
955 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, count));
958 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
959 Func<TSource, bool> predicate)
962 throw new ArgumentNullException ("source");
963 if (predicate == null)
964 throw new ArgumentNullException ("predicate");
966 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, (e, _) => predicate (e), false));
969 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
970 Func<TSource, int, bool> predicate)
973 throw new ArgumentNullException ("source");
974 if (predicate == null)
975 throw new ArgumentNullException ("predicate");
977 return new ParallelQuery<TSource> (new QueryHeadWorkerNode<TSource> (source.Node, predicate, true));
982 public static ParallelQuery<TSource> Skip<TSource> (this ParallelQuery<TSource> source, int count)
985 throw new ArgumentNullException ("source");
987 return source.Node.IsOrdered () ?
988 source.Where ((e, i) => i >= count) :
989 source.Where ((e) => count < 0 || Interlocked.Decrement (ref count) < 0);
993 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
994 Func<TSource, bool> predicate)
997 throw new ArgumentNullException ("source");
998 if (predicate == null)
999 throw new ArgumentNullException ("predicate");
1001 return source.Node.IsOrdered () ?
1002 source.SkipWhile ((e, i) => predicate (e)) :
1003 source.Where ((e) => !predicate (e));
1006 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
1007 Func<TSource, int, bool> predicate)
1010 throw new ArgumentNullException ("source");
1011 if (predicate == null)
1012 throw new ArgumentNullException ("predicate");
1014 int indexCache = int.MaxValue;
1016 return source.Where ((e, i) => i >= indexCache || (!predicate (e, i) && (indexCache = i) == i));
1021 static TSource SingleInternal<TSource> (this ParallelQuery<TSource> source, params TSource[] init)
1023 TSource result = default(TSource);
1024 bool hasValue = false;
1026 foreach (TSource element in source) {
1028 throw new InvalidOperationException ("The input sequence contains more than one element.");
1034 if (!hasValue && init.Length != 0) {
1040 throw new InvalidOperationException ("The input sequence is empty.");
1045 public static TSource Single<TSource> (this ParallelQuery<TSource> source)
1048 throw new ArgumentNullException ("source");
1050 return SingleInternal<TSource> (source);
1053 public static TSource Single<TSource> (this ParallelQuery<TSource> source,
1054 Func<TSource, bool> predicate)
1057 throw new ArgumentNullException ("source");
1058 if (predicate == null)
1059 throw new ArgumentNullException ("predicate");
1061 return source.Where (predicate).Single ();
1064 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source)
1067 throw new ArgumentNullException ("source");
1069 return SingleInternal<TSource> (source, default (TSource));
1072 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1075 throw new ArgumentNullException ("source");
1076 if (predicate == null)
1077 throw new ArgumentNullException ("predicate");
1079 return source.Where (predicate).SingleOrDefault ();
1084 public static int Count<TSource> (this ParallelQuery<TSource> source)
1087 throw new ArgumentNullException ("source");
1089 return source.Aggregate<TSource, int, int> (() => 0,
1090 (acc, e) => acc + 1,
1091 (acc1, acc2) => acc1 + acc2,
1092 (result) => result);
1095 public static int Count<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1098 throw new ArgumentNullException ("source");
1099 if (predicate == null)
1100 throw new ArgumentNullException ("predicate");
1102 return source.Where (predicate).Count ();
1105 public static long LongCount<TSource> (this ParallelQuery<TSource> source)
1108 throw new ArgumentNullException ("source");
1110 return source.Aggregate<TSource, long, long> (() => 0,
1111 (acc, e) => acc + 1,
1112 (acc1, acc2) => acc1 + acc2,
1113 (result) => result);
1116 public static long LongCount<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1119 throw new ArgumentNullException ("source");
1120 if (predicate == null)
1121 throw new ArgumentNullException ("predicate");
1123 return source.Where (predicate).LongCount ();
1128 public static double Average (this ParallelQuery<int> source)
1131 throw new ArgumentNullException ("source");
1133 return source.Aggregate (() => new int[2],
1134 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1135 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1136 (acc) => acc[0] / ((double)acc[1]));
1139 public static double Average (this ParallelQuery<long> source)
1142 throw new ArgumentNullException ("source");
1144 return source.Aggregate (() => new long[2],
1145 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1146 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1147 (acc) => acc[0] / ((double)acc[1]));
1150 public static decimal Average (this ParallelQuery<decimal> source)
1153 throw new ArgumentNullException ("source");
1155 return source.Aggregate (() => new decimal[2],
1156 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1157 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1158 (acc) => acc[0] / acc[1]);
1161 public static double Average (this ParallelQuery<double> source)
1164 throw new ArgumentNullException ("source");
1166 return source.Aggregate (() => new double[2],
1167 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1168 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1169 (acc) => acc[0] / ((double)acc[1]));
1172 public static float Average (this ParallelQuery<float> source)
1175 throw new ArgumentNullException ("source");
1177 return source.Aggregate (() => new float[2],
1178 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1179 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1180 (acc) => acc[0] / acc[1]);
1184 #region More Average
1185 public static double? Average (this ParallelQuery<int?> source)
1188 throw new ArgumentNullException ("source");
1190 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();;
1193 public static double? Average (this ParallelQuery<long?> source)
1196 throw new ArgumentNullException ("source");
1198 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1201 public static decimal? Average (this ParallelQuery<decimal?> source)
1204 throw new ArgumentNullException ("source");
1206 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1209 public static double? Average (this ParallelQuery<double?> source)
1212 throw new ArgumentNullException ("source");
1214 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1217 public static float? Average (this ParallelQuery<float?> source)
1220 throw new ArgumentNullException ("source");
1222 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1225 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1228 throw new ArgumentNullException ("source");
1229 if (selector == null)
1230 throw new ArgumentNullException ("selector");
1232 return source.Select (selector).Average ();
1235 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1238 throw new ArgumentNullException ("source");
1239 if (selector == null)
1240 throw new ArgumentNullException ("selector");
1242 return source.Select (selector).Average ();
1245 public static float Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1248 throw new ArgumentNullException ("source");
1249 if (selector == null)
1250 throw new ArgumentNullException ("selector");
1252 return source.Select (selector).Average ();
1255 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1258 throw new ArgumentNullException ("source");
1259 if (selector == null)
1260 throw new ArgumentNullException ("selector");
1262 return source.Select (selector).Average ();
1265 public static decimal Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1268 throw new ArgumentNullException ("source");
1269 if (selector == null)
1270 throw new ArgumentNullException ("selector");
1272 return source.Select (selector).Average ();
1275 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1278 throw new ArgumentNullException ("source");
1279 if (selector == null)
1280 throw new ArgumentNullException ("selector");
1282 return source.Select (selector).Average ();
1285 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1288 throw new ArgumentNullException ("source");
1289 if (selector == null)
1290 throw new ArgumentNullException ("selector");
1292 return source.Select (selector).Average ();
1295 public static float? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1298 throw new ArgumentNullException ("source");
1299 if (selector == null)
1300 throw new ArgumentNullException ("selector");
1302 return source.Select (selector).Average ();
1305 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1308 throw new ArgumentNullException ("source");
1309 if (selector == null)
1310 throw new ArgumentNullException ("selector");
1312 return source.Select (selector).Average ();
1315 public static decimal? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1318 throw new ArgumentNullException ("source");
1319 if (selector == null)
1320 throw new ArgumentNullException ("selector");
1322 return source.Select (selector).Average ();
1327 public static int Sum (this ParallelQuery<int> source)
1330 throw new ArgumentNullException ("source");
1332 return source.Aggregate (0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1335 public static long Sum (this ParallelQuery<long> source)
1338 throw new ArgumentNullException ("source");
1340 return source.Aggregate ((long)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1343 public static float Sum (this ParallelQuery<float> source)
1346 throw new ArgumentNullException ("source");
1348 return source.Aggregate (0.0f, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1351 public static double Sum (this ParallelQuery<double> source)
1354 throw new ArgumentNullException ("source");
1356 return source.Aggregate (0.0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1359 public static decimal Sum (this ParallelQuery<decimal> source)
1362 throw new ArgumentNullException ("source");
1364 return source.Aggregate ((decimal)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1367 public static int? Sum (this ParallelQuery<int?> source)
1369 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1372 public static long? Sum (this ParallelQuery<long?> source)
1375 throw new ArgumentNullException ("source");
1377 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1380 public static float? Sum (this ParallelQuery<float?> source)
1383 throw new ArgumentNullException ("source");
1385 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1388 public static double? Sum (this ParallelQuery<double?> source)
1391 throw new ArgumentNullException ("source");
1393 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1396 public static decimal? Sum (this ParallelQuery<decimal?> source)
1399 throw new ArgumentNullException ("source");
1401 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1404 public static int Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1407 throw new ArgumentNullException ("source");
1408 if (selector == null)
1409 throw new ArgumentNullException ("selector");
1411 return source.Select (selector).Sum ();
1414 public static long Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1417 throw new ArgumentNullException ("source");
1418 if (selector == null)
1419 throw new ArgumentNullException ("selector");
1421 return source.Select (selector).Sum ();
1424 public static decimal Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1427 throw new ArgumentNullException ("source");
1428 if (selector == null)
1429 throw new ArgumentNullException ("selector");
1431 return source.Select (selector).Sum ();
1434 public static float Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1437 throw new ArgumentNullException ("source");
1438 if (selector == null)
1439 throw new ArgumentNullException ("selector");
1441 return source.Select (selector).Sum ();
1444 public static double Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1447 throw new ArgumentNullException ("source");
1448 if (selector == null)
1449 throw new ArgumentNullException ("selector");
1451 return source.Select (selector).Sum ();
1454 public static int? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1457 throw new ArgumentNullException ("source");
1458 if (selector == null)
1459 throw new ArgumentNullException ("selector");
1461 return source.Select (selector).Sum ();
1464 public static long? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1467 throw new ArgumentNullException ("source");
1468 if (selector == null)
1469 throw new ArgumentNullException ("selector");
1471 return source.Select (selector).Sum ();
1474 public static decimal? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1477 throw new ArgumentNullException ("source");
1478 if (selector == null)
1479 throw new ArgumentNullException ("selector");
1481 return source.Select (selector).Sum ();
1484 public static float? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1487 throw new ArgumentNullException ("source");
1488 if (selector == null)
1489 throw new ArgumentNullException ("selector");
1491 return source.Select (selector).Sum ();
1494 public static double? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1497 throw new ArgumentNullException ("source");
1498 if (selector == null)
1499 throw new ArgumentNullException ("selector");
1501 return source.Select (selector).Sum ();
1506 static T BestOrder<T> (ParallelQuery<T> source, Func<T, T, bool> bestSelector, T seed)
1509 throw new ArgumentNullException ("source");
1513 best = source.Aggregate (() => seed,
1514 (first, second) => (bestSelector(first, second)) ? first : second,
1515 (first, second) => (bestSelector(first, second)) ? first : second,
1520 public static int Min (this ParallelQuery<int> source)
1522 return BestOrder (source, (first, second) => first < second, int.MaxValue);
1525 public static long Min (this ParallelQuery<long> source)
1527 return BestOrder (source, (first, second) => first < second, long.MaxValue);
1530 public static float Min (this ParallelQuery<float> source)
1532 return BestOrder (source, (first, second) => first < second, float.MaxValue);
1535 public static double Min (this ParallelQuery<double> source)
1537 return BestOrder (source, (first, second) => first < second, double.MaxValue);
1540 public static decimal Min (this ParallelQuery<decimal> source)
1542 return BestOrder (source, (first, second) => first < second, decimal.MaxValue);
1545 public static TSource Min<TSource> (this ParallelQuery<TSource> source)
1547 IComparer<TSource> comparer = Comparer<TSource>.Default;
1549 return BestOrder (source, (first, second) => comparer.Compare (first, second) < 0, default (TSource));
1552 public static TResult Min<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
1555 throw new ArgumentNullException ("source");
1556 if (selector == null)
1557 throw new ArgumentNullException ("selector");
1559 return source.Select (selector).Min ();
1562 public static int? Min (this ParallelQuery<int?> source)
1565 throw new ArgumentNullException ("source");
1567 return source.Select ((e) => e.HasValue ? e.Value : int.MaxValue).Min ();
1570 public static long? Min (this ParallelQuery<long?> source)
1573 throw new ArgumentNullException ("source");
1575 return source.Select ((e) => e.HasValue ? e.Value : long.MaxValue).Min ();
1578 public static float? Min (this ParallelQuery<float?> source)
1581 throw new ArgumentNullException ("source");
1583 return source.Select ((e) => e.HasValue ? e.Value : float.MaxValue).Min ();
1586 public static double? Min (this ParallelQuery<double?> source)
1589 throw new ArgumentNullException ("source");
1591 return source.Select ((e) => e.HasValue ? e.Value : double.MaxValue).Min ();
1594 public static decimal? Min (this ParallelQuery<decimal?> source)
1597 throw new ArgumentNullException ("source");
1599 return source.Select ((e) => e.HasValue ? e.Value : decimal.MaxValue).Min ();
1602 public static int Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1605 throw new ArgumentNullException ("source");
1606 if (selector == null)
1607 throw new ArgumentNullException ("selector");
1609 return source.Select (selector).Min ();
1612 public static long Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1615 throw new ArgumentNullException ("source");
1616 if (selector == null)
1617 throw new ArgumentNullException ("selector");
1619 return source.Select (selector).Min ();
1622 public static float Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1625 throw new ArgumentNullException ("source");
1626 if (selector == null)
1627 throw new ArgumentNullException ("selector");
1629 return source.Select (selector).Min ();
1632 public static double Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1635 throw new ArgumentNullException ("source");
1636 if (selector == null)
1637 throw new ArgumentNullException ("selector");
1639 return source.Select (selector).Min ();
1642 public static decimal Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1645 throw new ArgumentNullException ("source");
1646 if (selector == null)
1647 throw new ArgumentNullException ("selector");
1649 return source.Select (selector).Min ();
1652 public static int? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1655 throw new ArgumentNullException ("source");
1656 if (selector == null)
1657 throw new ArgumentNullException ("selector");
1659 return source.Select (selector).Min ();
1662 public static long? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1665 throw new ArgumentNullException ("source");
1666 if (selector == null)
1667 throw new ArgumentNullException ("selector");
1669 return source.Select (selector).Min ();
1672 public static float? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1675 throw new ArgumentNullException ("source");
1676 if (selector == null)
1677 throw new ArgumentNullException ("selector");
1679 return source.Select (selector).Min ();
1682 public static double? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1685 throw new ArgumentNullException ("source");
1686 if (selector == null)
1687 throw new ArgumentNullException ("selector");
1689 return source.Select (selector).Min ();
1692 public static decimal? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1695 throw new ArgumentNullException ("source");
1696 if (selector == null)
1697 throw new ArgumentNullException ("selector");
1699 return source.Select (selector).Min ();
1702 public static int Max (this ParallelQuery<int> source)
1704 return BestOrder (source, (first, second) => first > second, int.MinValue);
1707 public static long Max(this ParallelQuery<long> source)
1709 return BestOrder(source, (first, second) => first > second, long.MinValue);
1712 public static float Max (this ParallelQuery<float> source)
1714 return BestOrder(source, (first, second) => first > second, float.MinValue);
1717 public static double Max (this ParallelQuery<double> source)
1719 return BestOrder(source, (first, second) => first > second, double.MinValue);
1722 public static decimal Max (this ParallelQuery<decimal> source)
1724 return BestOrder(source, (first, second) => first > second, decimal.MinValue);
1727 public static TSource Max<TSource> (this ParallelQuery<TSource> source)
1729 IComparer<TSource> comparer = Comparer<TSource>.Default;
1731 return BestOrder (source, (first, second) => comparer.Compare (first, second) > 0, default (TSource));
1734 public static TResult Max<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
1737 throw new ArgumentNullException ("source");
1738 if (selector == null)
1739 throw new ArgumentNullException ("selector");
1741 return source.Select (selector).Max ();
1744 public static int? Max (this ParallelQuery<int?> source)
1747 throw new ArgumentNullException ("source");
1749 return source.Select ((e) => e.HasValue ? e.Value : int.MinValue).Max ();
1752 public static long? Max (this ParallelQuery<long?> source)
1755 throw new ArgumentNullException ("source");
1757 return source.Select ((e) => e.HasValue ? e.Value : long.MinValue).Max ();
1760 public static float? Max (this ParallelQuery<float?> source)
1763 throw new ArgumentNullException ("source");
1765 return source.Select ((e) => e.HasValue ? e.Value : float.MinValue).Max ();
1768 public static double? Max (this ParallelQuery<double?> source)
1771 throw new ArgumentNullException ("source");
1773 return source.Select ((e) => e.HasValue ? e.Value : double.MinValue).Max ();
1776 public static decimal? Max (this ParallelQuery<decimal?> source)
1779 throw new ArgumentNullException ("source");
1781 return source.Select ((e) => e.HasValue ? e.Value : decimal.MinValue).Max ();
1784 public static int Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> selector)
1787 throw new ArgumentNullException ("source");
1788 if (selector == null)
1789 throw new ArgumentNullException ("selector");
1791 return source.Select (selector).Max ();
1794 public static long Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> selector)
1797 throw new ArgumentNullException ("source");
1798 if (selector == null)
1799 throw new ArgumentNullException ("selector");
1801 return source.Select (selector).Max ();
1804 public static float Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> selector)
1807 throw new ArgumentNullException ("source");
1808 if (selector == null)
1809 throw new ArgumentNullException ("selector");
1811 return source.Select (selector).Max ();
1814 public static double Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> selector)
1817 throw new ArgumentNullException ("source");
1818 if (selector == null)
1819 throw new ArgumentNullException ("selector");
1821 return source.Select (selector).Max ();
1824 public static decimal Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> selector)
1827 throw new ArgumentNullException ("source");
1828 if (selector == null)
1829 throw new ArgumentNullException ("selector");
1831 return source.Select (selector).Max ();
1834 public static int? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> selector)
1837 throw new ArgumentNullException ("source");
1838 if (selector == null)
1839 throw new ArgumentNullException ("selector");
1841 return source.Select (selector).Max ();
1844 public static long? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> selector)
1847 throw new ArgumentNullException ("source");
1848 if (selector == null)
1849 throw new ArgumentNullException ("selector");
1851 return source.Select (selector).Max ();
1854 public static float? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> selector)
1857 throw new ArgumentNullException ("source");
1858 if (selector == null)
1859 throw new ArgumentNullException ("selector");
1861 return source.Select (selector).Max ();
1864 public static double? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> selector)
1867 throw new ArgumentNullException ("source");
1868 if (selector == null)
1869 throw new ArgumentNullException ("selector");
1871 return source.Select (selector).Max ();
1874 public static decimal? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> selector)
1877 throw new ArgumentNullException ("source");
1878 if (selector == null)
1879 throw new ArgumentNullException ("selector");
1881 return source.Select (selector).Max ();
1885 #region Cast / OfType
1886 public static ParallelQuery<TResult> Cast<TResult> (this ParallelQuery source)
1889 throw new ArgumentNullException ("source");
1891 return source.TypedQuery.Select ((e) => (TResult)e);
1894 public static ParallelQuery<TResult> OfType<TResult> (this ParallelQuery source)
1897 throw new ArgumentNullException ("source");
1899 return source.TypedQuery.Where ((e) => e is TResult).Cast<TResult> ();
1904 public static ParallelQuery<TSource> Reverse<TSource> (this ParallelQuery<TSource> source)
1907 throw new ArgumentNullException ("source");
1909 return new ParallelQuery<TSource> (new QueryReverseNode<TSource> (source));
1913 #region ToArray - ToList - ToDictionary - ToLookup
1914 public static List<TSource> ToList<TSource> (this ParallelQuery<TSource> source)
1917 throw new ArgumentNullException ("source");
1919 if (source.Node.IsOrdered ())
1920 return ToListOrdered (source);
1922 var helper = new ListAggregateHelper<TSource> ();
1923 List<TSource> temp = source.Aggregate (helper.Seed,
1924 helper.Intermediate,
1930 class ListAggregateHelper<TSource>
1932 public List<TSource> Seed ()
1934 return new List<TSource> (50);
1937 public List<TSource> Intermediate (List<TSource> list, TSource e)
1943 public List<TSource> Reducer (List<TSource> list, List<TSource> list2)
1945 list.AddRange (list2);
1949 public List<TSource> Final (List<TSource> list)
1955 internal static List<TSource> ToListOrdered<TSource> (this ParallelQuery<TSource> source)
1957 List<TSource> result = new List<TSource> ();
1959 foreach (TSource element in source)
1960 result.Add (element);
1965 public static TSource[] ToArray<TSource> (this ParallelQuery<TSource> source)
1968 throw new ArgumentNullException ("source");
1970 if (source.Node.IsOrdered ())
1971 return ToListOrdered (source).ToArray ();
1973 var helper = new ArrayAggregateHelper<TSource> ();
1974 ParallelExecuter.ProcessAndAggregate<TSource, List<TSource>> (source.Node,
1976 helper.Intermediate,
1979 return helper.Result;
1982 class ArrayAggregateHelper<TSource>
1986 public TSource[] Result {
1992 internal List<TSource> Seed ()
1994 return new List<TSource> ();
1997 internal List<TSource> Intermediate (List<TSource> list, TSource e)
2003 internal void Final (IList<List<TSource>> list)
2007 for (int i = 0; i < list.Count; i++)
2008 count += list[i].Count;
2010 result = new TSource[count];
2011 int insertIndex = -1;
2013 for (int i = 0; i < list.Count; i++)
2014 for (int j = 0; j < list[i].Count; j++)
2015 result [++insertIndex] = list[i][j];
2019 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
2020 Func<TSource, TKey> keySelector,
2021 IEqualityComparer<TKey> comparer)
2023 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
2026 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
2027 Func<TSource, TKey> keySelector)
2029 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
2032 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2033 Func<TSource, TKey> keySelector,
2034 Func<TSource, TElement> elementSelector)
2036 return ToDictionary<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
2039 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2040 Func<TSource, TKey> keySelector,
2041 Func<TSource, TElement> elementSelector,
2042 IEqualityComparer<TKey> comparer)
2045 throw new ArgumentNullException ("source");
2046 if (keySelector == null)
2047 throw new ArgumentNullException ("keySelector");
2048 if (comparer == null)
2049 comparer = EqualityComparer<TKey>.Default;
2050 if (elementSelector == null)
2051 throw new ArgumentNullException ("elementSelector");
2053 var helper = new DictionaryAggregateHelper<TSource, TKey, TElement> (comparer, keySelector, elementSelector);
2054 return source.Aggregate (helper.Seed,
2055 helper.Intermediate,
2060 class DictionaryAggregateHelper<TSource, TKey, TElement>
2062 IEqualityComparer<TKey> comparer;
2063 Func<TSource, TKey> keySelector;
2064 Func<TSource, TElement> elementSelector;
2066 public DictionaryAggregateHelper (IEqualityComparer<TKey> comparer,
2067 Func<TSource, TKey> keySelector,
2068 Func<TSource, TElement> elementSelector)
2070 this.comparer = comparer;
2071 this.keySelector = keySelector;
2072 this.elementSelector = elementSelector;
2075 public Dictionary<TKey, TElement> Seed ()
2077 return new Dictionary<TKey, TElement> (comparer);
2080 public Dictionary<TKey, TElement> Intermediate (Dictionary<TKey, TElement> d, TSource e)
2082 d.Add (keySelector (e), elementSelector (e));
2086 public Dictionary<TKey, TElement> Reducer (Dictionary<TKey, TElement> d1, Dictionary<TKey, TElement> d2)
2088 foreach (var couple in d2) d1.Add (couple.Key, couple.Value);
2092 public Dictionary<TKey, TElement> Final (Dictionary<TKey, TElement> d)
2098 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
2099 Func<TSource, TKey> keySelector)
2101 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
2104 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
2105 Func<TSource, TKey> keySelector,
2106 IEqualityComparer<TKey> comparer)
2108 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
2111 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2112 Func<TSource, TKey> keySelector,
2113 Func<TSource, TElement> elementSelector)
2115 return ToLookup<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
2118 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
2119 Func<TSource, TKey> keySelector,
2120 Func<TSource, TElement> elementSelector,
2121 IEqualityComparer<TKey> comparer)
2124 throw new ArgumentNullException ("source");
2125 if (keySelector == null)
2126 throw new ArgumentNullException ("keySelector");
2127 if (comparer == null)
2128 comparer = EqualityComparer<TKey>.Default;
2129 if (elementSelector == null)
2130 throw new ArgumentNullException ("elementSelector");
2132 ConcurrentLookup<TKey, TElement> lookup = new ConcurrentLookup<TKey, TElement> (comparer);
2133 source.ForAll ((e) => lookup.Add (keySelector (e), elementSelector (e)));
2140 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather than "
2141 + "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
2142 + "to convert the right data source to System.Linq.ParallelQuery<T>.")]
2143 public static ParallelQuery<TSource> Concat<TSource>(this ParallelQuery<TSource> first,
2144 IEnumerable<TSource> second)
2146 throw new NotSupportedException ();
2149 public static ParallelQuery<TSource> Concat<TSource> (this ParallelQuery<TSource> first, ParallelQuery<TSource> second)
2151 return new ParallelQuery<TSource> (new QueryConcatNode<TSource> (first.Node, second.Node));
2155 #region DefaultIfEmpty
2156 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source)
2158 return source.DefaultIfEmpty (default (TSource));
2161 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source, TSource defaultValue)
2163 return new ParallelQuery<TSource> (new QueryDefaultEmptyNode<TSource> (source.Node, defaultValue));
2168 public static TSource First<TSource> (this ParallelQuery<TSource> source)
2170 CancellationTokenSource src = new CancellationTokenSource ();
2171 IEnumerator<TSource> enumerator = source.WithImplementerToken (src).GetEnumerator ();
2173 if (enumerator == null || !enumerator.MoveNext ())
2174 throw new InvalidOperationException ("source contains no element");
2176 TSource result = enumerator.Current;
2178 enumerator.Dispose ();
2183 public static TSource First<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2185 return source.Where (predicate).First ();
2188 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source)
2190 return source.DefaultIfEmpty ().First ();
2193 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2195 return source.Where (predicate).FirstOrDefault ();
2200 public static TSource Last<TSource> (this ParallelQuery<TSource> source)
2202 return source.Reverse ().First ();
2205 public static TSource Last<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2207 return source.Reverse ().First (predicate);
2210 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source)
2212 return source.Reverse ().FirstOrDefault ();
2215 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2217 return source.Reverse ().FirstOrDefault (predicate);
2222 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2223 ParallelQuery<TSecond> second,
2224 Func<TFirst, TSecond, TResult> resultSelector)
2227 throw new ArgumentNullException ("first");
2229 throw new ArgumentNullException ("second");
2230 if (resultSelector == null)
2231 throw new ArgumentNullException ("resultSelector");
2233 return new ParallelQuery<TResult> (new QueryZipNode<TFirst, TSecond, TResult> (resultSelector, first.Node, second.Node));
2236 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
2237 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
2238 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
2239 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2240 IEnumerable<TSecond> second,
2241 Func<TFirst, TSecond, TResult> resultSelector)
2243 throw new NotSupportedException ();
2251 public T Apply (T input)