Merge pull request #495 from nicolas-raoul/fix-for-issue2907-with-no-formatting-changes
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / ParallelQueryEnumerator.cs
1 //
2 // ParallelEnumerator.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0 || MOBILE
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33 using System.Linq.Parallel.QueryNodes;
34
35 namespace System.Linq.Parallel
36 {
37         internal class ParallelQueryEnumerator<T> : IEnumerator<T>
38         {
39                 readonly int DefaultBufferSize = ParallelExecuter.GetBestWorkerNumber () * 50;
40
41                 BlockingCollection<T> buffer;
42                 IEnumerator<T> loader;
43                 QueryOptions options;
44                 OrderingEnumerator<T> ordEnumerator;
45
46                 T current;
47
48                 Action waitAction;
49
50                 internal ParallelQueryEnumerator (QueryBaseNode<T> node)
51                 {
52                         this.options = ParallelExecuter.CheckQuery (node);
53
54                         if (options.ShouldBeSequential && options.Mode != ParallelExecutionMode.ForceParallelism) {
55                                 IEnumerable<T> buffer = node.GetSequential ();
56                                 loader = buffer.GetEnumerator ();
57                         } else {
58                                 Setup ();
59
60                                 // Launch adding to the buffer asynchronously via Tasks
61                                 if (options.BehindOrderGuard.Value) {
62                                         waitAction = ParallelExecuter.ProcessAndCallback (node,
63                                                                                           ordEnumerator.Add,
64                                                                                           ordEnumerator.EndParticipation,
65                                                                                           ordEnumerator.Stop,
66                                                                                           options);
67                                 } else {
68                                         waitAction = ParallelExecuter.ProcessAndCallback (node,
69                                                                                           buffer.Add,
70                                                                                           buffer.CompleteAdding,
71                                                                                           options);
72                                 }
73
74                                 if (options.Options.HasValue && options.Options.Value == ParallelMergeOptions.FullyBuffered)
75                                         waitAction ();
76                         }
77                 }
78
79                 void Setup ()
80                 {
81                         if (!options.BehindOrderGuard.Value) {
82                                 if (options.Options.HasValue && (options.Options.Value == ParallelMergeOptions.NotBuffered
83                                                                  || options.Options.Value == ParallelMergeOptions.FullyBuffered)) {
84                                         buffer = new BlockingCollection<T> ();
85                                 } else {
86                                         buffer = new BlockingCollection<T> (DefaultBufferSize);
87                                 }
88
89                                 IEnumerable<T> source = buffer.GetConsumingEnumerable (options.MergedToken);
90                                 loader = source.GetEnumerator ();
91                         } else {
92                                 loader = ordEnumerator = new OrderingEnumerator<T> (options.PartitionCount, options.MergedToken);
93                         }
94                 }
95
96                 public void Dispose ()
97                 {
98
99                 }
100
101                 public void Reset ()
102                 {
103                         throw new NotSupportedException ();
104                 }
105
106                 public bool MoveNext ()
107                 {
108                         // If there are no stuff in the buffer
109                         // but CompleteAdding hasn't been called,
110                         // MoveNext blocks until further results are produced
111                         if (!loader.MoveNext ())
112                                 return false;
113
114                         current = loader.Current;
115                         return true;
116                 }
117
118                 public T Current {
119                         get {
120                                 return current;
121                         }
122                 }
123
124                 object IEnumerator.Current {
125                         get {
126                                 return current;
127                         }
128                 }
129         }
130 }
131 #endif