Merge pull request #496 from nicolas-raoul/unit-test-for-issue2907
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / OrderingEnumerator.cs
1 //
2 // OrderingEnumerator.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0 || MOBILE
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33
34 namespace System.Linq.Parallel
35 {
36         internal class OrderingEnumerator<T> : IEnumerator<T>
37         {
38                 internal class SlotBucket
39                 {
40                         readonly TemporaryArea<long, T> temporaryArea = new TemporaryArea<long, T> ();
41                         readonly KeyValuePair<long, T>[] stagingArea;
42                         
43                         long currentIndex;
44                         readonly int count;
45
46                         CountdownEvent stagingCount;
47                         CountdownEvent participantCount;
48
49                         CancellationTokenSource src = new CancellationTokenSource ();
50                         CancellationToken mergedToken;
51
52                         public SlotBucket (int count, CancellationToken token)
53                         {
54                                 this.count = count;
55                                 stagingCount = new CountdownEvent (count);
56                                 participantCount = new CountdownEvent (count);
57                                 stagingArea = new KeyValuePair<long, T>[count];
58                                 currentIndex = -count;
59                                 mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
60                         }
61
62                         public void Add (KeyValuePair<long, T> value)
63                         {
64                                 long index = value.Key;
65                                 
66                                 if (index >= currentIndex && index < currentIndex + count) {
67                                         stagingArea[index % count] = value;
68                                         stagingCount.Signal ();
69                                 } else {
70                                         temporaryArea.TryAdd (index, value.Value);
71                                         if (index >= currentIndex && index < currentIndex + count) {
72                                                 T dummy;
73                                                 if (temporaryArea.TryRemove (index, out dummy)) {
74                                                         stagingArea[index % count] = value;
75                                                         stagingCount.Signal ();
76                                                 }
77                                         }
78                                 }
79                         }
80                         
81                         // Called by each worker's endAction
82                         public void EndParticipation ()
83                         {
84                                 if (participantCount.Signal ())
85                                         src.Cancel ();
86                         }
87
88                         // Called at the end with ContinueAll
89                         public void Stop ()
90                         {
91                                 src.Cancel ();
92                         }
93
94                         bool Skim ()
95                         {
96                                 bool result = false;
97
98                                 for (int i = 0; i < count; i++) {
99                                         T temp;
100                                         int index = i + (int)currentIndex;
101                                         
102                                         if (stagingArea[i].Key != -1)
103                                                 continue;
104
105                                         if (!temporaryArea.TryRemove (index, out temp))
106                                                 continue;
107
108                                         result = true;
109                                         stagingArea [i] = new KeyValuePair<long, T> (index, temp);
110                                         if (stagingCount.Signal ())
111                                                 break;
112                                 }
113
114                                 return result;
115                         }
116                         
117                         void Clean ()
118                         {
119                                 for (int i = 0; i < stagingArea.Length; i++)
120                                         stagingArea[i] = new KeyValuePair<long, T> (-1, default (T));
121                         }
122
123                         public KeyValuePair<long, T>[] Wait ()
124                         {
125                                 Clean ();
126                                 stagingCount.Reset ();
127                                 
128                                 Interlocked.Add (ref currentIndex, count);
129
130                                 Skim ();
131
132                                 while (!stagingCount.IsSet) {
133                                         if (!participantCount.IsSet) {
134                                                 try {
135                                                         stagingCount.Wait (mergedToken);
136                                                 } catch {
137                                                         Skim ();
138                                                 }
139                                         }
140
141                                         if (participantCount.IsSet) {
142                                                 if (Skim ())
143                                                         continue;
144                                                 // Totally finished
145                                                 if (stagingArea[0].Key != -1)
146                                                         break;
147                                                 else
148                                                         return null;
149                                         }
150                                 }
151
152                                 return stagingArea;
153                         }
154                 }
155
156                 SlotBucket slotBucket;
157                 
158                 KeyValuePair<long, T>[] slot;
159                 int current;
160
161                 internal OrderingEnumerator (int num, CancellationToken token)
162                 {
163                         slotBucket = new SlotBucket (num, token);
164                 }
165
166                 public void Dispose ()
167                 {
168                         slotBucket.Stop ();
169                 }
170
171                 public void Reset ()
172                 {
173
174                 }
175
176                 public bool MoveNext ()
177                 {
178                         do {
179                                 if (slot == null || ++current >= slot.Length) {
180                                         if ((slot = slotBucket.Wait ()) == null)
181                                                 return false;
182                                         current = 0;
183                                 }
184                         } while (slot[current].Key == -1);
185
186                         return true;
187                 }
188
189                 public T Current {
190                         get {
191                                 return slot[current].Value;
192                         }
193                 }
194
195                 object IEnumerator.Current {
196                         get {
197                                 return slot[current].Value;
198                         }
199                 }
200                 
201                 public void Add (KeyValuePair<long, T> value, CancellationToken token)
202                 {
203                         slotBucket.Add (value);
204                 }
205                         
206                 // Called by each worker's endAction
207                 public void EndParticipation ()
208                 {
209                         slotBucket.EndParticipation ();
210                 }
211                 
212                 // Called at the end with ContinueAll
213                 public void Stop ()
214                 {
215                         slotBucket.Stop ();
216                 }
217         }
218 }
219 #endif