[bcl] Remove NET_4_0 defines from class libs.
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / OrderingEnumerator.cs
1 //
2 // OrderingEnumerator.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 using System;
28 using System.Threading;
29 using System.Collections;
30 using System.Collections.Generic;
31 using System.Collections.Concurrent;
32
33 namespace System.Linq.Parallel
34 {
35         internal class OrderingEnumerator<T> : IEnumerator<T>
36         {
37                 internal class SlotBucket
38                 {
39                         readonly TemporaryArea<long, T> temporaryArea = new TemporaryArea<long, T> ();
40                         readonly KeyValuePair<long, T>[] stagingArea;
41                         
42                         long currentIndex;
43                         readonly int count;
44
45                         CountdownEvent stagingCount;
46                         CountdownEvent participantCount;
47
48                         CancellationTokenSource src = new CancellationTokenSource ();
49                         CancellationToken mergedToken;
50
51                         public SlotBucket (int count, CancellationToken token)
52                         {
53                                 this.count = count;
54                                 stagingCount = new CountdownEvent (count);
55                                 participantCount = new CountdownEvent (count);
56                                 stagingArea = new KeyValuePair<long, T>[count];
57                                 currentIndex = -count;
58                                 mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
59                         }
60
61                         public void Add (KeyValuePair<long, T> value)
62                         {
63                                 long index = value.Key;
64                                 
65                                 if (index >= currentIndex && index < currentIndex + count) {
66                                         stagingArea[index % count] = value;
67                                         stagingCount.Signal ();
68                                 } else {
69                                         temporaryArea.TryAdd (index, value.Value);
70                                         if (index >= currentIndex && index < currentIndex + count) {
71                                                 T dummy;
72                                                 if (temporaryArea.TryRemove (index, out dummy)) {
73                                                         stagingArea[index % count] = value;
74                                                         stagingCount.Signal ();
75                                                 }
76                                         }
77                                 }
78                         }
79                         
80                         // Called by each worker's endAction
81                         public void EndParticipation ()
82                         {
83                                 if (participantCount.Signal ())
84                                         src.Cancel ();
85                         }
86
87                         // Called at the end with ContinueAll
88                         public void Stop ()
89                         {
90                                 src.Cancel ();
91                         }
92
93                         bool Skim ()
94                         {
95                                 bool result = false;
96
97                                 for (int i = 0; i < count; i++) {
98                                         T temp;
99                                         int index = i + (int)currentIndex;
100                                         
101                                         if (stagingArea[i].Key != -1)
102                                                 continue;
103
104                                         if (!temporaryArea.TryRemove (index, out temp))
105                                                 continue;
106
107                                         result = true;
108                                         stagingArea [i] = new KeyValuePair<long, T> (index, temp);
109                                         if (stagingCount.Signal ())
110                                                 break;
111                                 }
112
113                                 return result;
114                         }
115                         
116                         void Clean ()
117                         {
118                                 for (int i = 0; i < stagingArea.Length; i++)
119                                         stagingArea[i] = new KeyValuePair<long, T> (-1, default (T));
120                         }
121
122                         public KeyValuePair<long, T>[] Wait ()
123                         {
124                                 Clean ();
125                                 stagingCount.Reset ();
126                                 
127                                 Interlocked.Add (ref currentIndex, count);
128
129                                 Skim ();
130
131                                 while (!stagingCount.IsSet) {
132                                         if (!participantCount.IsSet) {
133                                                 try {
134                                                         stagingCount.Wait (mergedToken);
135                                                 } catch {
136                                                         Skim ();
137                                                 }
138                                         }
139
140                                         if (participantCount.IsSet) {
141                                                 if (Skim ())
142                                                         continue;
143                                                 // Totally finished
144                                                 if (stagingArea[0].Key != -1)
145                                                         break;
146                                                 else
147                                                         return null;
148                                         }
149                                 }
150
151                                 return stagingArea;
152                         }
153                 }
154
155                 SlotBucket slotBucket;
156                 
157                 KeyValuePair<long, T>[] slot;
158                 int current;
159
160                 internal OrderingEnumerator (int num, CancellationToken token)
161                 {
162                         slotBucket = new SlotBucket (num, token);
163                 }
164
165                 public void Dispose ()
166                 {
167                         slotBucket.Stop ();
168                 }
169
170                 public void Reset ()
171                 {
172
173                 }
174
175                 public bool MoveNext ()
176                 {
177                         do {
178                                 if (slot == null || ++current >= slot.Length) {
179                                         if ((slot = slotBucket.Wait ()) == null)
180                                                 return false;
181                                         current = 0;
182                                 }
183                         } while (slot[current].Key == -1);
184
185                         return true;
186                 }
187
188                 public T Current {
189                         get {
190                                 return slot[current].Value;
191                         }
192                 }
193
194                 object IEnumerator.Current {
195                         get {
196                                 return slot[current].Value;
197                         }
198                 }
199                 
200                 public void Add (KeyValuePair<long, T> value, CancellationToken token)
201                 {
202                         slotBucket.Add (value);
203                 }
204                         
205                 // Called by each worker's endAction
206                 public void EndParticipation ()
207                 {
208                         slotBucket.EndParticipation ();
209                 }
210                 
211                 // Called at the end with ContinueAll
212                 public void Stop ()
213                 {
214                         slotBucket.Stop ();
215                 }
216         }
217 }