Merge branch 'master' of github.com:mono/mono
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / OrderingEnumerator.cs
1 //
2 // OrderingEnumerator.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33
34 namespace System.Linq.Parallel
35 {
36         internal class OrderingEnumerator<T> : IEnumerator<T>
37         {
38                 internal class SlotBucket
39                 {
40                         readonly ConcurrentDictionary<long, T> temporaryArea = new ConcurrentDictionary<long, T> ();
41                         readonly KeyValuePair<long, T>[] stagingArea;
42                         
43                         long currentIndex;
44                         readonly int count;
45
46                         CountdownEvent stagingCount;
47                         CountdownEvent participantCount;
48
49                         CancellationTokenSource src = new CancellationTokenSource ();
50                         CancellationToken mergedToken;
51
52                         public SlotBucket (int count, CancellationToken token)
53                         {
54                                 this.count = count;
55                                 stagingCount = new CountdownEvent (count);
56                                 participantCount = new CountdownEvent (count);
57                                 stagingArea = new KeyValuePair<long, T>[count];
58                                 currentIndex = -count;
59                                 mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
60                         }
61
62                         public void Add (KeyValuePair<long, T> value)
63                         {
64                                 long index = value.Key;
65                                 
66                                 if (index >= currentIndex && index < currentIndex + count) {
67                                         stagingArea[index % count] = value;
68                                         stagingCount.Signal ();
69                                 } else {
70                                         temporaryArea.TryAdd (index, value.Value);
71                                         if (index >= currentIndex && index < currentIndex + count) {
72                                                 T dummy;
73                                                 if (temporaryArea.TryRemove (index, out dummy)) {
74                                                         stagingArea[index % count] = value;
75                                                         stagingCount.Signal ();
76                                                 }
77                                         }
78                                 }
79                         }
80                         
81                         // Called by each worker's endAction
82                         public void EndParticipation ()
83                         {
84                                 if (participantCount.Signal ())
85                                         src.Cancel ();
86                         }
87
88                         // Called at the end with ContinueAll
89                         public void Stop ()
90                         {
91                                 src.Cancel ();
92                         }
93
94                         bool Skim ()
95                         {
96                                 bool result = false;
97
98                                 for (int i = 0; i < count; i++) {
99                                         T temp;
100                                         int index = i + (int)currentIndex;
101                                         
102                                         if (stagingArea[i].Key != -1)
103                                                 continue;
104
105                                         if (!temporaryArea.TryRemove (index, out temp))
106                                                 continue;
107
108                                         result = true;
109                                         stagingArea [i] = new KeyValuePair<long, T> (index, temp);
110                                         if (stagingCount.Signal ())
111                                                 break;
112                                 }
113
114                                 return result;
115                         }
116                         
117                         void Clean ()
118                         {
119                                 for (int i = 0; i < stagingArea.Length; i++)
120                                         stagingArea[i] = new KeyValuePair<long, T> (-1, default (T));
121                         }
122
123                         public KeyValuePair<long, T>[] Wait ()
124                         {
125                                 Clean ();
126                                 stagingCount.Reset ();
127                                 
128                                 Interlocked.Add (ref currentIndex, count);
129
130                                 Skim ();
131
132                                 while (!stagingCount.IsSet) {
133                                         if (!participantCount.IsSet)
134                                                 try {
135                                                         stagingCount.Wait (mergedToken);
136                                                 } catch {
137                                                         Skim ();
138                                                 }
139
140
141                                         // Ok so basically we hit the case where we return null but there is actually
142                                         // every remaining element inside temporaryArea. Thing is that index are basically getting messed up (probably)
143                                         // So Skim doesn't see them and inaccurately say there is nothing remaining.
144                                         // To prove this, adding a if (temporaryArea.IsEmpty) before returning null result in infinite loop
145                                         // plus if it was a problem with something here it would show elsewhere so the problem definitely comes from
146                                         // SelectMany.
147                                         if (participantCount.IsSet) {
148                                                 if (Skim ())
149                                                         continue;
150                                                 // Totally finished
151                                                 if (stagingArea.Any (s => s.Key != -1))
152                                                         break;
153                                                 else
154                                                         return null;
155                                         }
156                                 }
157                                 
158                                 return stagingArea;
159                         }
160                 }
161
162                 SlotBucket slotBucket;
163                 
164                 KeyValuePair<long, T>[] slot;
165                 int current;
166
167                 internal OrderingEnumerator (int num, CancellationToken token)
168                 {
169                         slotBucket = new SlotBucket (num, token);
170                 }
171
172                 public void Dispose ()
173                 {
174                         slotBucket.Stop ();
175                 }
176
177                 public void Reset ()
178                 {
179
180                 }
181
182                 public bool MoveNext ()
183                 {
184                         do {
185                                 if (slot == null || ++current >= slot.Length) {
186                                         if ((slot = slotBucket.Wait ()) == null)
187                                                 return false;
188                                         current = 0;
189                                 }
190                         } while (slot[current].Key == -1);
191
192                         return true;
193                 }
194
195                 public T Current {
196                         get {
197                                 return slot[current].Value;
198                         }
199                 }
200
201                 object IEnumerator.Current {
202                         get {
203                                 return slot[current].Value;
204                         }
205                 }
206                 
207                 public void Add (KeyValuePair<long, T> value, CancellationToken token)
208                 {
209                         slotBucket.Add (value);
210                 }
211                         
212                 // Called by each worker's endAction
213                 public void EndParticipation ()
214                 {
215                         slotBucket.EndParticipation ();
216                 }
217                 
218                 // Called at the end with ContinueAll
219                 public void Stop ()
220                 {
221                         slotBucket.Stop ();
222                 }
223         }
224 }
225 #endif