3 // ParallelEnumerable.cs
6 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
8 // Copyright (c) 2010 Jérémie "Garuma" Laval
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to deal
12 // in the Software without restriction, including without limitation the rights
13 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 // copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
17 // The above copyright notice and this permission notice shall be included in
18 // all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
36 public static class ParallelEnumerable
38 #region Range & Repeat
39 public static ParallelQuery<int> Range (int start, int count)
41 if (int.MaxValue - start < count)
42 throw new ArgumentOutOfRangeException ("count", "start + count - 1 is larger than Int32.MaxValue");
44 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
46 return (new RangeList (start, count)).AsParallel ();
49 public static ParallelQuery<TResult> Repeat<TResult> (TResult obj, int count)
52 throw new ArgumentOutOfRangeException ("count", "count is less than 0");
54 return (new RepeatList<TResult> (obj, count)).AsParallel ();
59 public static ParallelQuery<TResult> Empty<TResult> ()
61 return Repeat<TResult> (default (TResult), 0);
66 public static ParallelQuery<TSource> AsParallel<TSource> (this IEnumerable<TSource> source)
69 throw new ArgumentNullException ("source");
71 return new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
74 public static ParallelQuery<TSource> AsParallel<TSource> (this Partitioner<TSource> source)
77 throw new ArgumentNullException ("source");
79 return new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
82 public static ParallelQuery AsParallel (this IEnumerable source)
85 throw new ArgumentNullException ("source");
87 return new ParallelQuery<object> (new QueryStartNode<object> (source.Cast<object> ()));
90 public static IEnumerable<TSource> AsEnumerable<TSource> (this ParallelQuery<TSource> source)
93 throw new ArgumentNullException ("source");
95 return source.AsSequential ();
98 public static IEnumerable<TSource> AsSequential<TSource> ( this ParallelQuery<TSource> source)
101 throw new ArgumentNullException ("source");
103 return source.Node.GetSequential ();
107 #region AsOrdered / AsUnordered
108 public static ParallelQuery<TSource> AsOrdered<TSource> (this ParallelQuery<TSource> source)
111 throw new ArgumentNullException ("source");
113 return new ParallelQuery<TSource> (new QueryAsOrderedNode<TSource> (source.Node));
116 public static ParallelQuery<TSource> AsUnordered<TSource> (this ParallelQuery<TSource> source)
119 throw new ArgumentNullException ("source");
121 return new ParallelQuery<TSource> (new QueryAsUnorderedNode<TSource> (source.Node));
124 public static ParallelQuery AsOrdered (this ParallelQuery source)
127 throw new ArgumentNullException ("source");
129 return source.TypedQuery.AsOrdered ();
134 public static ParallelQuery<TSource> WithExecutionMode<TSource> (this ParallelQuery<TSource> source,
135 ParallelExecutionMode executionMode)
138 throw new ArgumentNullException ("source");
140 return new ParallelQuery<TSource> (new ParallelExecutionModeNode<TSource> (executionMode, source.Node));
143 public static ParallelQuery<TSource> WithCancellation<TSource> (this ParallelQuery<TSource> source,
144 CancellationToken cancellationToken)
147 throw new ArgumentNullException ("source");
149 return new ParallelQuery<TSource> (new CancellationTokenNode<TSource> (cancellationToken, source.Node));
152 public static ParallelQuery<TSource> WithMergeOptions<TSource> (this ParallelQuery<TSource> source,
153 ParallelMergeOptions mergeOptions)
156 throw new ArgumentNullException ("source");
158 return new ParallelQuery<TSource> (new ParallelMergeOptionsNode<TSource> (mergeOptions, source.Node));
161 public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource> (this ParallelQuery<TSource> source,
162 int degreeParallelism)
164 if (degreeParallelism < 1 || degreeParallelism > 63)
165 throw new ArgumentException ("degreeOfParallelism is less than 1 or greater than 63", "degreeParallelism");
167 throw new ArgumentNullException ("source");
169 return new ParallelQuery<TSource> (new DegreeOfParallelismNode<TSource> (degreeParallelism, source.Node));
172 internal static ParallelQuery<TSource> WithImplementerToken<TSource> (this ParallelQuery<TSource> source,
173 CancellationTokenSource token)
175 return new ParallelQuery<TSource> (new ImplementerTokenNode<TSource> (token, source.Node));
180 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
183 throw new ArgumentNullException ("source");
184 if (selector == null)
185 throw new ArgumentNullException ("selector");
187 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
190 public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, int, TResult> selector)
193 throw new ArgumentNullException ("source");
194 if (selector == null)
195 throw new ArgumentNullException ("selector");
197 return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
202 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
203 Func<TSource, IEnumerable<TResult>> selector)
205 return source.SelectMany (selector, (s, e) => e);
208 public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
209 Func<TSource, int, IEnumerable<TResult>> selector)
211 return source.SelectMany (selector, (s, e) => e);
214 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
215 Func<TSource, IEnumerable<TCollection>> collectionSelector,
216 Func<TSource, TCollection, TResult> resultSelector)
218 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
223 public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
224 Func<TSource, int, IEnumerable<TCollection>> collectionSelector,
225 Func<TSource, TCollection, TResult> resultSelector)
227 return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
234 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
237 throw new ArgumentNullException ("source");
238 if (predicate == null)
239 throw new ArgumentNullException ("predicate");
241 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
244 public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, int, bool> predicate)
247 throw new ArgumentNullException ("source");
248 if (predicate == null)
249 throw new ArgumentNullException ("predicate");
251 return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
256 public static TSource Aggregate<TSource> (this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)
259 throw new ArgumentNullException ("source");
261 throw new ArgumentNullException ("func");
263 return source.Aggregate<TSource, TSource, TSource> ((Func<TSource>)null,
269 public static TAccumulate Aggregate<TSource, TAccumulate> (this ParallelQuery<TSource> source,
271 Func<TAccumulate, TSource, TAccumulate> func)
274 throw new ArgumentNullException ("source");
276 throw new ArgumentNullException ("func");
278 return source.Aggregate (seed, func, (e) => e);
281 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
283 Func<TAccumulate, TSource, TAccumulate> func,
284 Func<TAccumulate, TResult> resultSelector)
287 throw new ArgumentNullException ("source");
289 throw new ArgumentNullException ("func");
290 if (resultSelector == null)
291 throw new ArgumentNullException ("resultSelector");
293 TAccumulate accumulator = seed;
295 foreach (TSource value in source)
296 accumulator = func (accumulator, value);
298 return resultSelector (accumulator);
301 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
303 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
304 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
305 Func<TAccumulate, TResult> resultSelector)
308 throw new ArgumentNullException ("source");
309 if (updateAccumulatorFunc == null)
310 throw new ArgumentNullException ("updateAccumulatorFunc");
311 if (combineAccumulatorsFunc == null)
312 throw new ArgumentNullException ("combineAccumulatorsFunc");
313 if (resultSelector == null)
314 throw new ArgumentNullException ("resultSelector");
316 return source.Aggregate (() => seed, updateAccumulatorFunc, combineAccumulatorsFunc, resultSelector);
319 public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
320 Func<TAccumulate> seedFunc,
321 Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
322 Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
323 Func<TAccumulate, TResult> resultSelector)
326 throw new ArgumentNullException ("source");
327 if (seedFunc == null)
328 throw new ArgumentNullException ("seedFunc");
329 if (updateAccumulatorFunc == null)
330 throw new ArgumentNullException ("updateAccumulatorFunc");
331 if (combineAccumulatorsFunc == null)
332 throw new ArgumentNullException ("combineAccumulatorsFunc");
333 if (resultSelector == null)
334 throw new ArgumentNullException ("resultSelector");
336 TAccumulate accumulator = default (TAccumulate);
338 ParallelExecuter.ProcessAndAggregate<TSource, TAccumulate> (source.Node, seedFunc, updateAccumulatorFunc, (list) => {
339 accumulator = list [0];
340 for (int i = 1; i < list.Count; i++)
341 accumulator = combineAccumulatorsFunc (accumulator, list[i]);
344 return resultSelector (accumulator);;
349 public static void ForAll<TSource> (this ParallelQuery<TSource> source, Action<TSource> action)
352 throw new ArgumentNullException ("source");
354 throw new ArgumentNullException ("action");
356 ParallelExecuter.ProcessAndBlock (source.Node, action);
361 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
362 Func<TSource, TKey> keySelector,
363 IComparer<TKey> comparer)
366 throw new ArgumentNullException ("source");
367 if (keySelector == null)
368 throw new ArgumentNullException ("keySelector");
369 if (comparer == null)
370 throw new ArgumentNullException ("comparer");
372 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
374 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
377 public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
378 Func<TSource, TKey> keySelector)
380 return OrderByDescending (source, keySelector, Comparer<TKey>.Default);
383 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
384 Func<TSource, TKey> keySelector)
386 return OrderBy (source, keySelector, Comparer<TKey>.Default);
389 public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
390 Func<TSource, TKey> keySelector,
391 IComparer<TKey> comparer)
394 throw new ArgumentNullException ("source");
395 if (keySelector == null)
396 throw new ArgumentNullException ("keySelector");
397 if (comparer == null)
398 throw new ArgumentNullException ("comparer");
400 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
402 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
407 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
408 Func<TSource, TKey> keySelector)
410 return ThenBy (source, keySelector, Comparer<TKey>.Default);
413 public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
414 Func<TSource, TKey> keySelector,
415 IComparer<TKey> comparer)
418 throw new ArgumentNullException ("source");
419 if (keySelector == null)
420 throw new ArgumentNullException ("keySelector");
421 if (comparer == null)
422 throw new ArgumentNullException ("comparer");
424 Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
426 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
429 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
430 Func<TSource, TKey> keySelector)
432 return ThenByDescending (source, keySelector, Comparer<TKey>.Default);
435 public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
436 Func<TSource, TKey> keySelector,
437 IComparer<TKey> comparer)
440 throw new ArgumentNullException ("source");
441 if (keySelector == null)
442 throw new ArgumentNullException ("keySelector");
443 if (comparer == null)
444 throw new ArgumentNullException ("comparer");
446 Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
448 return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
453 public static bool All<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
456 throw new ArgumentNullException ("source");
457 if (predicate == null)
458 throw new ArgumentNullException ("predicate");
460 CancellationTokenSource src = new CancellationTokenSource ();
461 ParallelQuery<TSource> innerQuery = source.WithImplementerToken (src);
464 innerQuery.ForAll ((e) => {
465 if (!predicate (e)) {
476 public static bool Any<TSource> (this ParallelQuery<TSource> source)
478 return Any<TSource> (source, (_) => true);
481 public static bool Any<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
484 throw new ArgumentNullException ("source");
485 if (predicate == null)
486 throw new ArgumentNullException ("predicate");
488 return !source.All ((e) => !predicate (e));
493 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value)
495 return Contains<TSource> (source, value, EqualityComparer<TSource>.Default);
498 public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
501 throw new ArgumentNullException ("source");
502 if (comparer == null)
503 throw new ArgumentNullException ("comparer");
505 return Any<TSource> (source, (e) => comparer.Equals (value));
509 #region SequenceEqual
510 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
511 ParallelQuery<TSource> second)
514 throw new ArgumentNullException ("first");
516 throw new ArgumentNullException ("second");
518 return first.SequenceEqual (second, EqualityComparer<TSource>.Default);
521 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
522 ParallelQuery<TSource> second,
523 IEqualityComparer<TSource> comparer)
526 throw new ArgumentNullException ("first");
528 throw new ArgumentNullException ("second");
529 if (comparer == null)
530 throw new ArgumentNullException ("comparer");
532 CancellationTokenSource source = new CancellationTokenSource ();
533 ParallelQuery<bool> innerQuery
534 = first.Zip (second, (e1, e2) => comparer.Equals (e1, e2)).Where ((e) => !e).WithImplementerToken (source);
538 innerQuery.ForAll ((value) => {
546 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
547 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
548 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
549 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
551 throw new NotSupportedException ();
554 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
555 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
556 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
557 public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
558 IEnumerable<TSource> second,
559 IEqualityComparer<TSource> comparer)
561 throw new NotSupportedException ();
567 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> ( this ParallelQuery<TSource> source,
568 Func<TSource, TKey> keySelector)
570 return source.GroupBy (keySelector, EqualityComparer<TKey>.Default);
573 public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> ( this ParallelQuery<TSource> source,
574 Func<TSource, TKey> keySelector,
575 IEqualityComparer<TKey> comparer)
577 return source.GroupBy (keySelector, (e) => e, comparer);
580 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
581 Func<TSource, TKey> keySelector,
582 Func<TSource, TElement> elementSelector)
584 return source.GroupBy (keySelector, elementSelector, EqualityComparer<TKey>.Default);
587 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
588 Func<TSource, TKey> keySelector,
589 Func<TKey, IEnumerable<TSource>, TResult> resultSelector)
591 return source.GroupBy (keySelector)
592 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
595 public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
596 Func<TSource, TKey> keySelector,
597 Func<TSource, TElement> elementSelector,
598 IEqualityComparer<TKey> comparer)
600 throw new NotImplementedException ();
603 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
604 Func<TSource, TKey> keySelector,
605 Func<TSource, TElement> elementSelector,
606 Func<TKey, IEnumerable<TElement>, TResult> resultSelector)
608 return source.GroupBy (keySelector, elementSelector)
609 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
612 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> ( this ParallelQuery<TSource> source,
613 Func<TSource, TKey> keySelector,
614 Func<TKey, IEnumerable<TSource>, TResult> resultSelector,
615 IEqualityComparer<TKey> comparer)
617 return source.GroupBy (keySelector, comparer)
618 .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
621 public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
622 Func<TSource, TKey> keySelector,
623 Func<TSource, TElement> elementSelector,
624 Func<TKey, IEnumerable<TElement>, TResult> resultSelector,
625 IEqualityComparer<TKey> comparer)
627 return source.GroupBy (keySelector, elementSelector, comparer)
628 .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
633 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
634 ParallelQuery<TInner> inner,
635 Func<TOuter, TKey> outerKeySelector,
636 Func<TInner, TKey> innerKeySelector,
637 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
639 return outer.GroupJoin (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
642 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
643 ParallelQuery<TInner> inner,
644 Func<TOuter, TKey> outerKeySelector,
645 Func<TInner, TKey> innerKeySelector,
646 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
647 IEqualityComparer<TKey> comparer)
649 throw new NotImplementedException ();
652 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
653 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
654 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
655 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
656 IEnumerable<TInner> inner,
657 Func<TOuter, TKey> outerKeySelector,
658 Func<TInner, TKey> innerKeySelector,
659 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
661 throw new NotSupportedException ();
664 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
665 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
666 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
667 public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
668 IEnumerable<TInner> inner,
669 Func<TOuter, TKey> outerKeySelector,
670 Func<TInner, TKey> innerKeySelector,
671 Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
672 IEqualityComparer<TKey> comparer)
674 throw new NotImplementedException ();
679 public static TSource ElementAt<TSource> ( this ParallelQuery<TSource> source, int index)
682 throw new ArgumentNullException ("source");
684 throw new ArgumentOutOfRangeException ("index");
687 return source.First ();
688 } catch (InvalidOperationException) {
689 throw new ArgumentOutOfRangeException ("index");
693 TSource result = default (TSource);
695 ParallelQuery<TSource> innerQuery = source.Where ((e, i) => i == index);
698 result = innerQuery.First ();
699 } catch (InvalidOperationException) {
700 throw new ArgumentOutOfRangeException ("index");
706 public static TSource ElementAtOrDefault<TSource> (this ParallelQuery<TSource> source, int index)
709 throw new ArgumentNullException ("source");
712 return source.ElementAt (index);
713 } catch (ArgumentOutOfRangeException) {
714 return default (TSource);
720 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
721 ParallelQuery<TSource> second)
723 return Intersect<TSource> (first, second, EqualityComparer<TSource>.Default);
726 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
727 ParallelQuery<TSource> second,
728 IEqualityComparer<TSource> comparer)
731 throw new ArgumentNullException ("first");
733 throw new ArgumentNullException ("second");
734 if (comparer == null)
735 throw new ArgumentNullException ("comparer");
737 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Intersect, comparer, first.Node, second.Node));
740 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
741 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
742 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
743 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
745 throw new NotSupportedException ();
748 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
749 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
750 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
751 public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
752 IEnumerable<TSource> second,
753 IEqualityComparer<TSource> comparer)
755 throw new NotSupportedException ();
760 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> ( this ParallelQuery<TOuter> outer,
761 ParallelQuery<TInner> inner,
762 Func<TOuter, TKey> outerKeySelector,
763 Func<TInner, TKey> innerKeySelector,
764 Func<TOuter, TInner, TResult> resultSelector)
766 return outer.Join (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
769 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> ( this ParallelQuery<TOuter> outer,
770 ParallelQuery<TInner> inner,
771 Func<TOuter, TKey> outerKeySelector,
772 Func<TInner, TKey> innerKeySelector,
773 Func<TOuter, TInner, TResult> resultSelector,
774 IEqualityComparer<TKey> comparer)
776 throw new NotImplementedException ();
779 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
780 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
781 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
782 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> ( this ParallelQuery<TOuter> outer,
783 IEnumerable<TInner> inner,
784 Func<TOuter, TKey> outerKeySelector,
785 Func<TInner, TKey> innerKeySelector,
786 Func<TOuter, TInner, TResult> resultSelector)
788 throw new NotSupportedException ();
791 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
792 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
793 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
794 public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
795 IEnumerable<TInner> inner,
796 Func<TOuter, TKey> outerKeySelector,
797 Func<TInner, TKey> innerKeySelector,
798 Func<TOuter, TInner, TResult> resultSelector,
799 IEqualityComparer<TKey> comparer)
801 throw new NotSupportedException ();
806 public static ParallelQuery<TSource> Except<TSource> ( this ParallelQuery<TSource> first,
807 ParallelQuery<TSource> second)
809 return Except<TSource> (first, second, EqualityComparer<TSource>.Default);
812 public static ParallelQuery<TSource> Except<TSource> ( this ParallelQuery<TSource> first,
813 ParallelQuery<TSource> second,
814 IEqualityComparer<TSource> comparer)
817 throw new ArgumentNullException ("first");
819 throw new ArgumentNullException ("second");
820 if (comparer == null)
821 throw new ArgumentNullException ("comparer");
823 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Except,
824 comparer, first.Node, second.Node));
827 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
828 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
829 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
830 public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
831 IEnumerable<TSource> second)
833 throw new NotSupportedException ();
836 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
837 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
838 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
839 public static ParallelQuery<TSource> Except<TSource> ( this ParallelQuery<TSource> first,
840 IEnumerable<TSource> second,
841 IEqualityComparer<TSource> comparer)
843 throw new NotSupportedException ();
848 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source)
850 return Distinct<TSource> (source, EqualityComparer<TSource>.Default);
853 public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source, IEqualityComparer<TSource> comparer)
856 throw new ArgumentNullException ("source");
857 if (comparer == null)
858 throw new ArgumentNullException ("comparer");
860 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Distinct, comparer,
866 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
867 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
868 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
869 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
870 IEnumerable<TSource> second)
872 throw new NotSupportedException ();
875 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
876 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
877 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
878 public static ParallelQuery<TSource> Union<TSource>(this ParallelQuery<TSource> first,
879 IEnumerable<TSource> second,
880 IEqualityComparer<TSource> comparer)
882 throw new NotSupportedException ();
885 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
886 ParallelQuery<TSource> second)
888 return first.Union (second, EqualityComparer<TSource>.Default);
891 public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
892 ParallelQuery<TSource> second,
893 IEqualityComparer<TSource> comparer)
896 throw new ArgumentNullException ("first");
898 throw new ArgumentNullException ("second");
899 if (comparer == null)
900 throw new ArgumentNullException ("comparer");
902 return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Union, comparer, first.Node, second.Node));
907 // TODO : introduce some early break up here, use ImplementerToken
908 public static ParallelQuery<TSource> Take<TSource> (this ParallelQuery<TSource> source, int count)
911 throw new ArgumentNullException ("source");
913 return source.Where ((e, i) => i < count);
916 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
917 Func<TSource, bool> predicate)
920 throw new ArgumentNullException ("source");
921 if (predicate == null)
922 throw new ArgumentNullException ("predicate");
924 return source.Where ((e) => predicate (e));
927 public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
928 Func<TSource, int, bool> predicate)
931 throw new ArgumentNullException ("source");
932 if (predicate == null)
933 throw new ArgumentNullException ("predicate");
935 return source.Where ((e, i) => predicate (e, i));
940 public static ParallelQuery<TSource> Skip<TSource> (this ParallelQuery<TSource> source, int count)
943 throw new ArgumentNullException ("source");
945 return source.Where ((e, i) => i >= count);
948 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
949 Func<TSource, bool> predicate)
952 throw new ArgumentNullException ("source");
953 if (predicate == null)
954 throw new ArgumentNullException ("predicate");
956 return source.Where ((e) => !predicate (e));
959 public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
960 Func<TSource, int, bool> predicate)
963 throw new ArgumentNullException ("source");
964 if (predicate == null)
965 throw new ArgumentNullException ("predicate");
967 return source.Where ((e, i) => !predicate (e, i));
972 static TSource SingleInternal<TSource> (this ParallelQuery<TSource> source, params TSource[] init)
974 TSource result = default(TSource);
975 bool hasValue = false;
977 foreach (TSource element in source) {
979 throw new InvalidOperationException ("The input sequence contains more than one element.");
985 if (!hasValue && init.Length != 0) {
991 throw new InvalidOperationException ("The input sequence is empty.");
996 public static TSource Single<TSource> (this ParallelQuery<TSource> source)
999 throw new ArgumentNullException ("source");
1001 return SingleInternal<TSource> (source);
1004 public static TSource Single<TSource> (this ParallelQuery<TSource> source,
1005 Func<TSource, bool> predicate)
1008 throw new ArgumentNullException ("source");
1009 if (predicate == null)
1010 throw new ArgumentNullException ("predicate");
1012 return source.Where (predicate).Single ();
1015 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source)
1018 throw new ArgumentNullException ("source");
1020 return SingleInternal<TSource> (source, default (TSource));
1023 public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1026 throw new ArgumentNullException ("source");
1027 if (predicate == null)
1028 throw new ArgumentNullException ("predicate");
1030 return source.Where (predicate).SingleOrDefault ();
1035 public static int Count<TSource> (this ParallelQuery<TSource> source)
1038 throw new ArgumentNullException ("source");
1040 return source.Aggregate<TSource, int, int> (() => 0,
1041 (acc, e) => acc + 1,
1042 (acc1, acc2) => acc1 + acc2,
1043 (result) => result);
1046 public static int Count<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1049 throw new ArgumentNullException ("source");
1050 if (predicate == null)
1051 throw new ArgumentNullException ("predicate");
1053 return source.Where (predicate).Count ();
1056 public static long LongCount<TSource> (this ParallelQuery<TSource> source)
1059 throw new ArgumentNullException ("source");
1061 return source.Aggregate<TSource, long, long> (() => 0,
1062 (acc, e) => acc + 1,
1063 (acc1, acc2) => acc1 + acc2,
1064 (result) => result);
1067 public static long LongCount<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
1070 throw new ArgumentNullException ("source");
1071 if (predicate == null)
1072 throw new ArgumentNullException ("predicate");
1074 return source.Where (predicate).LongCount ();
1079 public static double Average (this ParallelQuery<int> source)
1082 throw new ArgumentNullException ("source");
1084 return source.Aggregate (() => new int[2],
1085 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1086 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1087 (acc) => acc[0] / ((double)acc[1]));
1090 public static double Average (this ParallelQuery<long> source)
1093 throw new ArgumentNullException ("source");
1095 return source.Aggregate (() => new long[2],
1096 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1097 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1098 (acc) => acc[0] / ((double)acc[1]));
1101 public static decimal Average (this ParallelQuery<decimal> source)
1104 throw new ArgumentNullException ("source");
1106 return source.Aggregate (() => new decimal[2],
1107 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1108 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1109 (acc) => acc[0] / acc[1]);
1112 public static double Average (this ParallelQuery<double> source)
1115 throw new ArgumentNullException ("source");
1117 return source.Aggregate (() => new double[2],
1118 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1119 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1120 (acc) => acc[0] / ((double)acc[1]));
1123 public static float Average (this ParallelQuery<float> source)
1126 throw new ArgumentNullException ("source");
1128 return source.Aggregate (() => new float[2],
1129 (acc, e) => { acc[0] += e; acc[1]++; return acc; },
1130 (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
1131 (acc) => acc[0] / acc[1]);
1135 #region More Average
1136 public static double? Average (this ParallelQuery<int?> source)
1139 throw new ArgumentNullException ("source");
1141 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();;
1144 public static double? Average (this ParallelQuery<long?> source)
1147 throw new ArgumentNullException ("source");
1149 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1152 public static decimal? Average (this ParallelQuery<decimal?> source)
1155 throw new ArgumentNullException ("source");
1157 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1160 public static double? Average (this ParallelQuery<double?> source)
1163 throw new ArgumentNullException ("source");
1165 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1168 public static float? Average (this ParallelQuery<float?> source)
1171 throw new ArgumentNullException ("source");
1173 return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
1176 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
1179 throw new ArgumentNullException ("source");
1181 throw new ArgumentNullException ("func");
1183 return source.Select (func).Average ();
1186 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
1189 throw new ArgumentNullException ("source");
1191 throw new ArgumentNullException ("func");
1193 return source.Select (func).Average ();
1196 public static float Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
1199 throw new ArgumentNullException ("source");
1201 throw new ArgumentNullException ("func");
1203 return source.Select (func).Average ();
1206 public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
1209 throw new ArgumentNullException ("source");
1211 throw new ArgumentNullException ("func");
1213 return source.Select (func).Average ();
1216 public static decimal Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
1219 throw new ArgumentNullException ("source");
1221 throw new ArgumentNullException ("func");
1223 return source.Select (func).Average ();
1226 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
1229 throw new ArgumentNullException ("source");
1231 throw new ArgumentNullException ("func");
1233 return source.Select (func).Average ();
1236 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
1239 throw new ArgumentNullException ("source");
1241 throw new ArgumentNullException ("func");
1243 return source.Select (func).Average ();
1246 public static float? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
1249 throw new ArgumentNullException ("source");
1251 throw new ArgumentNullException ("func");
1253 return source.Select (func).Average ();
1256 public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
1259 throw new ArgumentNullException ("source");
1261 throw new ArgumentNullException ("func");
1263 return source.Select (func).Average ();
1266 public static decimal? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
1269 throw new ArgumentNullException ("source");
1271 throw new ArgumentNullException ("func");
1273 return source.Select (func).Average ();
1278 public static int Sum (this ParallelQuery<int> source)
1281 throw new ArgumentNullException ("source");
1283 return source.Aggregate (0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1286 public static long Sum (this ParallelQuery<long> source)
1289 throw new ArgumentNullException ("source");
1291 return source.Aggregate ((long)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1294 public static float Sum (this ParallelQuery<float> source)
1297 throw new ArgumentNullException ("source");
1299 return source.Aggregate (0.0f, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1302 public static double Sum (this ParallelQuery<double> source)
1305 throw new ArgumentNullException ("source");
1307 return source.Aggregate (0.0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1310 public static decimal Sum (this ParallelQuery<decimal> source)
1313 throw new ArgumentNullException ("source");
1315 return source.Aggregate ((decimal)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
1318 public static int? Sum (this ParallelQuery<int?> source)
1320 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1323 public static long? Sum (this ParallelQuery<long?> source)
1326 throw new ArgumentNullException ("source");
1328 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1331 public static float? Sum (this ParallelQuery<float?> source)
1334 throw new ArgumentNullException ("source");
1336 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1339 public static double? Sum (this ParallelQuery<double?> source)
1342 throw new ArgumentNullException ("source");
1344 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1347 public static decimal? Sum (this ParallelQuery<decimal?> source)
1350 throw new ArgumentNullException ("source");
1352 return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
1355 public static int Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
1358 throw new ArgumentNullException ("source");
1360 throw new ArgumentNullException ("func");
1362 return source.Select (func).Sum ();
1365 public static long Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
1368 throw new ArgumentNullException ("source");
1370 throw new ArgumentNullException ("func");
1372 return source.Select (func).Sum ();
1375 public static decimal Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
1378 throw new ArgumentNullException ("source");
1380 throw new ArgumentNullException ("func");
1382 return source.Select (func).Sum ();
1385 public static float Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
1388 throw new ArgumentNullException ("source");
1390 throw new ArgumentNullException ("func");
1392 return source.Select (func).Sum ();
1395 public static double Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
1398 throw new ArgumentNullException ("source");
1400 throw new ArgumentNullException ("func");
1402 return source.Select (func).Sum ();
1405 public static int? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
1408 throw new ArgumentNullException ("source");
1410 throw new ArgumentNullException ("func");
1412 return source.Select (func).Sum ();
1415 public static long? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
1418 throw new ArgumentNullException ("source");
1420 throw new ArgumentNullException ("func");
1422 return source.Select (func).Sum ();
1425 public static decimal? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
1428 throw new ArgumentNullException ("source");
1430 throw new ArgumentNullException ("func");
1432 return source.Select (func).Sum ();
1435 public static float? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
1438 throw new ArgumentNullException ("source");
1440 throw new ArgumentNullException ("func");
1442 return source.Select (func).Sum ();
1445 public static double? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
1448 throw new ArgumentNullException ("source");
1450 throw new ArgumentNullException ("func");
1452 return source.Select (func).Sum ();
1457 static T BestOrder<T> (ParallelQuery<T> source, Func<T, T, bool> bestSelector, T seed)
1460 throw new ArgumentNullException ("source");
1464 best = source.Aggregate (() => seed,
1465 (first, second) => (bestSelector(first, second)) ? first : second,
1466 (first, second) => (bestSelector(first, second)) ? first : second,
1471 public static int Min (this ParallelQuery<int> source)
1473 return BestOrder (source, (first, second) => first < second, int.MaxValue);
1476 public static long Min (this ParallelQuery<long> source)
1478 return BestOrder (source, (first, second) => first < second, long.MaxValue);
1481 public static float Min (this ParallelQuery<float> source)
1483 return BestOrder (source, (first, second) => first < second, float.MaxValue);
1486 public static double Min (this ParallelQuery<double> source)
1488 return BestOrder (source, (first, second) => first < second, double.MaxValue);
1491 public static decimal Min (this ParallelQuery<decimal> source)
1493 return BestOrder (source, (first, second) => first < second, decimal.MaxValue);
1496 public static TSource Min<TSource> (this ParallelQuery<TSource> source)
1498 IComparer<TSource> comparer = Comparer<TSource>.Default;
1500 return BestOrder (source, (first, second) => comparer.Compare (first, second) < 0, default (TSource));
1503 public static TResult Min<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> func)
1506 throw new ArgumentNullException ("source");
1508 throw new ArgumentNullException ("func");
1510 return source.Select (func).Min ();
1513 public static int? Min (this ParallelQuery<int?> source)
1516 throw new ArgumentNullException ("source");
1518 return source.Select ((e) => e.HasValue ? e.Value : int.MaxValue).Min ();
1521 public static long? Min (this ParallelQuery<long?> source)
1524 throw new ArgumentNullException ("source");
1526 return source.Select ((e) => e.HasValue ? e.Value : long.MaxValue).Min ();
1529 public static float? Min (this ParallelQuery<float?> source)
1532 throw new ArgumentNullException ("source");
1534 return source.Select ((e) => e.HasValue ? e.Value : float.MaxValue).Min ();
1537 public static double? Min (this ParallelQuery<double?> source)
1540 throw new ArgumentNullException ("source");
1542 return source.Select ((e) => e.HasValue ? e.Value : double.MaxValue).Min ();
1545 public static decimal? Min (this ParallelQuery<decimal?> source)
1548 throw new ArgumentNullException ("source");
1550 return source.Select ((e) => e.HasValue ? e.Value : decimal.MaxValue).Min ();
1553 public static int Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
1556 throw new ArgumentNullException ("source");
1558 throw new ArgumentNullException ("func");
1560 return source.Select (func).Min ();
1563 public static long Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
1566 throw new ArgumentNullException ("source");
1568 throw new ArgumentNullException ("func");
1570 return source.Select (func).Min ();
1573 public static float Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
1576 throw new ArgumentNullException ("source");
1578 throw new ArgumentNullException ("func");
1580 return source.Select (func).Min ();
1583 public static double Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
1586 throw new ArgumentNullException ("source");
1588 throw new ArgumentNullException ("func");
1590 return source.Select (func).Min ();
1593 public static decimal Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
1596 throw new ArgumentNullException ("source");
1598 throw new ArgumentNullException ("func");
1600 return source.Select (func).Min ();
1603 public static int? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
1606 throw new ArgumentNullException ("source");
1608 throw new ArgumentNullException ("func");
1610 return source.Select (func).Min ();
1613 public static long? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
1616 throw new ArgumentNullException ("source");
1618 throw new ArgumentNullException ("func");
1620 return source.Select (func).Min ();
1623 public static float? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
1626 throw new ArgumentNullException ("source");
1628 throw new ArgumentNullException ("func");
1630 return source.Select (func).Min ();
1633 public static double? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
1636 throw new ArgumentNullException ("source");
1638 throw new ArgumentNullException ("func");
1640 return source.Select (func).Min ();
1643 public static decimal? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
1646 throw new ArgumentNullException ("source");
1648 throw new ArgumentNullException ("func");
1650 return source.Select (func).Min ();
1653 public static int Max (this ParallelQuery<int> source)
1655 return BestOrder (source, (first, second) => first > second, int.MinValue);
1658 public static long Max(this ParallelQuery<long> source)
1660 return BestOrder(source, (first, second) => first > second, long.MinValue);
1663 public static float Max (this ParallelQuery<float> source)
1665 return BestOrder(source, (first, second) => first > second, float.MinValue);
1668 public static double Max (this ParallelQuery<double> source)
1670 return BestOrder(source, (first, second) => first > second, double.MinValue);
1673 public static decimal Max (this ParallelQuery<decimal> source)
1675 return BestOrder(source, (first, second) => first > second, decimal.MinValue);
1678 public static TSource Max<TSource> (this ParallelQuery<TSource> source)
1680 IComparer<TSource> comparer = Comparer<TSource>.Default;
1682 return BestOrder (source, (first, second) => comparer.Compare (first, second) > 0, default (TSource));
1685 public static TResult Max<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> func)
1688 throw new ArgumentNullException ("source");
1690 throw new ArgumentNullException ("func");
1692 return source.Select (func).Max ();
1695 public static int? Max (this ParallelQuery<int?> source)
1698 throw new ArgumentNullException ("source");
1700 return source.Select ((e) => e.HasValue ? e.Value : int.MinValue).Max ();
1703 public static long? Max (this ParallelQuery<long?> source)
1706 throw new ArgumentNullException ("source");
1708 return source.Select ((e) => e.HasValue ? e.Value : long.MinValue).Max ();
1711 public static float? Max (this ParallelQuery<float?> source)
1714 throw new ArgumentNullException ("source");
1716 return source.Select ((e) => e.HasValue ? e.Value : float.MinValue).Max ();
1719 public static double? Max (this ParallelQuery<double?> source)
1722 throw new ArgumentNullException ("source");
1724 return source.Select ((e) => e.HasValue ? e.Value : double.MinValue).Max ();
1727 public static decimal? Max (this ParallelQuery<decimal?> source)
1730 throw new ArgumentNullException ("source");
1732 return source.Select ((e) => e.HasValue ? e.Value : decimal.MinValue).Max ();
1735 public static int Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
1738 throw new ArgumentNullException ("source");
1740 throw new ArgumentNullException ("func");
1742 return source.Select (func).Max ();
1745 public static long Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
1748 throw new ArgumentNullException ("source");
1750 throw new ArgumentNullException ("func");
1752 return source.Select (func).Max ();
1755 public static float Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
1758 throw new ArgumentNullException ("source");
1760 throw new ArgumentNullException ("func");
1762 return source.Select (func).Max ();
1765 public static double Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
1768 throw new ArgumentNullException ("source");
1770 throw new ArgumentNullException ("func");
1772 return source.Select (func).Max ();
1775 public static decimal Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
1778 throw new ArgumentNullException ("source");
1780 throw new ArgumentNullException ("func");
1782 return source.Select (func).Max ();
1785 public static int? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
1788 throw new ArgumentNullException ("source");
1790 throw new ArgumentNullException ("func");
1792 return source.Select (func).Max ();
1795 public static long? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
1798 throw new ArgumentNullException ("source");
1800 throw new ArgumentNullException ("func");
1802 return source.Select (func).Max ();
1805 public static float? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
1808 throw new ArgumentNullException ("source");
1810 throw new ArgumentNullException ("func");
1812 return source.Select (func).Max ();
1815 public static double? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
1818 throw new ArgumentNullException ("source");
1820 throw new ArgumentNullException ("func");
1822 return source.Select (func).Max ();
1825 public static decimal? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
1828 throw new ArgumentNullException ("source");
1830 throw new ArgumentNullException ("func");
1832 return source.Select (func).Max ();
1836 #region Cast / OfType
1837 public static ParallelQuery<TResult> Cast<TResult> (this ParallelQuery source)
1840 throw new ArgumentNullException ("source");
1842 return source.TypedQuery.Select ((e) => (TResult)e);
1845 public static ParallelQuery<TResult> OfType<TResult> (this ParallelQuery source)
1848 throw new ArgumentNullException ("source");
1850 return source.TypedQuery.Where ((e) => e is TResult).Cast<TResult> ();
1855 public static ParallelQuery<TSource> Reverse<TSource> (this ParallelQuery<TSource> source)
1858 throw new ArgumentNullException ("source");
1860 return new ParallelQuery<TSource> (new QueryReverseNode<TSource> (source));
1864 #region ToArray - ToList - ToDictionary - ToLookup
1865 public static List<TSource> ToList<TSource> (this ParallelQuery<TSource> source)
1868 throw new ArgumentNullException ("source");
1870 OrderedParallelQuery<TSource> ordered = null;
1871 if ((ordered = source as OrderedParallelQuery<TSource>) != null)
1872 return ToListOrdered (ordered);
1874 List<TSource> temp = source.Aggregate (() => new List<TSource>(50),
1875 (list, e) => { list.Add (e); return list; },
1876 (list, list2) => { list.AddRange (list2); return list; },
1881 static List<TSource> ToListOrdered<TSource> (this OrderedParallelQuery<TSource> source)
1883 List<TSource> result = new List<TSource> ();
1885 foreach (TSource element in source)
1886 result.Add (element);
1891 public static TSource[] ToArray<TSource> (this ParallelQuery<TSource> source)
1894 throw new ArgumentNullException ("source");
1896 TSource[] result = null;
1898 Func<List<TSource>, TSource, List<TSource>> intermediate = (list, e) => {
1899 list.Add (e); return list;
1902 Action<IList<List<TSource>>> final = (list) => {
1905 for (int i = 0; i < list.Count; i++)
1906 count += list[i].Count;
1908 result = new TSource[count];
1909 int insertIndex = -1;
1911 for (int i = 0; i < list.Count; i++)
1912 for (int j = 0; j < list[i].Count; j++)
1913 result [++insertIndex] = list[i][j];
1916 ParallelExecuter.ProcessAndAggregate<TSource, List<TSource>> (source.Node,
1917 () => new List<TSource> (),
1924 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
1925 Func<TSource, TKey> keySelector,
1926 IEqualityComparer<TKey> comparer)
1928 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
1931 public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
1932 Func<TSource, TKey> keySelector)
1934 return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
1937 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
1938 Func<TSource, TKey> keySelector,
1939 Func<TSource, TElement> elementSelector)
1941 return ToDictionary<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
1944 public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
1945 Func<TSource, TKey> keySelector,
1946 Func<TSource, TElement> elementSelector,
1947 IEqualityComparer<TKey> comparer)
1950 throw new ArgumentNullException ("source");
1951 if (keySelector == null)
1952 throw new ArgumentNullException ("keySelector");
1953 if (comparer == null)
1954 throw new ArgumentNullException ("comparer");
1955 if (elementSelector == null)
1956 throw new ArgumentNullException ("elementSelector");
1958 return source.Aggregate (() => new Dictionary<TKey, TElement> (comparer),
1959 (d, e) => { d.Add (keySelector (e), elementSelector (e)); return d; },
1960 (d1, d2) => { foreach (var couple in d2) d1.Add (couple.Key, couple.Value); return d1; },
1964 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> ( this ParallelQuery<TSource> source,
1965 Func<TSource, TKey> keySelector)
1967 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
1970 public static ILookup<TKey, TSource> ToLookup<TSource, TKey> ( this ParallelQuery<TSource> source,
1971 Func<TSource, TKey> keySelector,
1972 IEqualityComparer<TKey> comparer)
1974 return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
1977 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
1978 Func<TSource, TKey> keySelector,
1979 Func<TSource, TElement> elementSelector)
1981 return ToLookup<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
1984 public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
1985 Func<TSource, TKey> keySelector,
1986 Func<TSource, TElement> elementSelector,
1987 IEqualityComparer<TKey> comparer)
1990 throw new ArgumentNullException ("source");
1991 if (keySelector == null)
1992 throw new ArgumentNullException ("keySelector");
1993 if (comparer == null)
1994 throw new ArgumentNullException ("comparer");
1995 if (elementSelector == null)
1996 throw new ArgumentNullException ("elementSelector");
1998 ConcurrentLookup<TKey, TElement> lookup = new ConcurrentLookup<TKey, TElement> (comparer);
1999 source.ForAll ((e) => lookup.Add (keySelector (e), elementSelector (e)));
2006 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather than "
2007 + "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
2008 + "to convert the right data source to System.Linq.ParallelQuery<T>.")]
2009 public static ParallelQuery<TSource> Concat<TSource>(this ParallelQuery<TSource> first,
2010 IEnumerable<TSource> second)
2012 throw new NotSupportedException ();
2015 public static ParallelQuery<TSource> Concat<TSource> ( this ParallelQuery<TSource> first, ParallelQuery<TSource> second)
2017 return new ParallelQuery<TSource> (new QueryConcatNode<TSource> (first.Node, second.Node));
2021 #region DefaultIfEmpty
2022 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> ( this ParallelQuery<TSource> source)
2024 return source.DefaultIfEmpty (default (TSource));
2027 public static ParallelQuery<TSource> DefaultIfEmpty<TSource> ( this ParallelQuery<TSource> source, TSource defaultValue)
2029 return new ParallelQuery<TSource> (new QueryDefaultEmptyNode<TSource> (source.Node, defaultValue));
2034 public static TSource First<TSource> ( this ParallelQuery<TSource> source)
2036 CancellationTokenSource src = new CancellationTokenSource ();
2037 IEnumerator<TSource> enumerator = source.WithImplementerToken (src).GetEnumerator ();
2039 if (enumerator == null || !enumerator.MoveNext ())
2040 throw new InvalidOperationException ("source contains no element");
2042 TSource result = enumerator.Current;
2048 public static TSource First<TSource> ( this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2050 return source.Where (predicate).First ();
2053 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source)
2055 return source.DefaultIfEmpty ().First ();
2058 public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2060 return source.Where (predicate).FirstOrDefault ();
2065 public static TSource Last<TSource> (this ParallelQuery<TSource> source)
2067 return source.Reverse ().First ();
2070 public static TSource Last<TSource> ( this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2072 return source.Reverse ().First (predicate);
2075 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source)
2077 return source.Reverse ().FirstOrDefault ();
2080 public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
2082 return source.Reverse ().FirstOrDefault (predicate);
2087 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2088 ParallelQuery<TSecond> second,
2089 Func<TFirst, TSecond, TResult> resultSelector)
2092 throw new ArgumentNullException ("first");
2094 throw new ArgumentNullException ("second");
2095 if (resultSelector == null)
2096 throw new ArgumentNullException ("resultSelector");
2098 return new ParallelQuery<TResult> (new QueryZipNode<TFirst, TSecond, TResult> (resultSelector, first.Node, second.Node));
2101 [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
2102 + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
2103 + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
2104 public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
2105 IEnumerable<TSecond> second,
2106 Func<TFirst, TSecond, TResult> resultSelector)
2108 throw new NotSupportedException ();