Merge branch 'master' of github.com:mono/mono
[mono.git] / mcs / class / System.Core / System.Linq.Parallel / OrderingEnumerator.cs
1 //
2 // OrderingEnumerator.cs
3 //
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
33
34 namespace System.Linq.Parallel
35 {
36         internal class OrderingEnumerator<T> : IEnumerator<T>
37         {
38                 internal class SlotBucket
39                 {
40                         ConcurrentDictionary<long, T> temporaryArea = new ConcurrentDictionary<long, T> ();
41                         KeyValuePair<long, T>?[] stagingArea;
42                         
43                         long currentIndex;
44                         readonly int count;
45                         CountdownEvent stagingCount;
46                         CountdownEvent participantCount;
47                         CancellationTokenSource src = new CancellationTokenSource ();
48                         CancellationToken mergedToken;
49
50                         public SlotBucket (int count, CancellationToken token)
51                         {
52                                 this.count = count;
53                                 stagingCount = new CountdownEvent (count);
54                                 participantCount = new CountdownEvent (count);
55                                 stagingArea = new KeyValuePair<long, T>?[count];
56                                 currentIndex = -count;
57                                 mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
58                         }
59
60                         public void Add (KeyValuePair<long, T> value)
61                         {
62                                 long index = value.Key;
63                                 
64                                 if (index >= currentIndex && index < currentIndex + count) {
65                                         stagingArea[index % count] = value;
66                                         stagingCount.Signal ();
67                                 } else {
68                                         temporaryArea.TryAdd (index, value.Value);
69                                         if (index >= currentIndex && index < currentIndex + count) {
70                                                 T dummy;
71                                                 if (temporaryArea.TryRemove (index, out dummy)) {
72                                                         stagingArea[index % count] = value;
73                                                         stagingCount.Signal ();
74                                                 }
75                                         }
76                                 }
77                         }
78                         
79                         // Called by each worker's endAction
80                         public void EndParticipation ()
81                         {
82                                 if (participantCount.Signal ())
83                                         src.Cancel ();
84                         }
85
86                         // Called at the end with ContinueAll
87                         public void Stop ()
88                         {
89                                 src.Cancel ();
90                         }
91
92                         void Skim ()
93                         {
94                                 for (int i = 0; i < count; i++) {
95                                         T temp;
96                                         int index = i + (int)currentIndex;
97                                         
98                                         if (stagingArea[index % count].HasValue)
99                                                 continue;
100
101                                         if (!temporaryArea.TryRemove (index, out temp))
102                                                 continue;
103                                         
104                                         stagingArea [index % count] = new KeyValuePair<long, T> (index, temp);
105                                         if (stagingCount.Signal ())
106                                                 break;
107                                 }
108                         }
109                         
110                         void Clean ()
111                         {
112                                 for (int i = 0; i < stagingArea.Length; i++)
113                                         stagingArea[i] = new Nullable<KeyValuePair<long, T>> ();
114                         }
115
116                         public IEnumerator<KeyValuePair<long, T>?> Wait ()
117                         {
118                                 Clean ();
119                                 stagingCount.Reset ();
120                                 
121                                 Interlocked.Add (ref currentIndex, count);
122
123                                 Skim ();
124
125                                 while (!stagingCount.IsSet) {
126                                         if (!participantCount.IsSet)
127                                                 try {
128                                                         stagingCount.Wait (mergedToken);
129                                                 } catch {
130                                                         Skim ();
131                                                 }
132
133                                         if (participantCount.IsSet) {
134                                                 // Totally finished
135                                                 if (stagingArea[0].HasValue)
136                                                         break;
137                                                 else
138                                                         return null;
139                                         }
140                                 }
141                                 
142                                 return ((IEnumerable<KeyValuePair<long, T>?>)stagingArea).GetEnumerator ();
143                         }
144                 }
145
146                 readonly int num;
147                 SlotBucket slotBucket;
148                 
149                 IEnumerator<KeyValuePair<long, T>?> currEnum;
150                 KeyValuePair<long, T> curr;
151
152                 internal OrderingEnumerator (int num, CancellationToken token)
153                 {
154                         this.num = num;
155                         slotBucket = new SlotBucket (num, token);
156                 }
157
158                 public void Dispose ()
159                 {
160                         slotBucket.Stop ();
161                 }
162
163                 public void Reset ()
164                 {
165
166                 }
167
168                 public bool MoveNext ()
169                 {
170                         do {
171                                 while (currEnum == null || !currEnum.MoveNext ()) {
172                                         if (currEnum != null)
173                                                 currEnum.Dispose ();
174                                         if ((currEnum = slotBucket.Wait ()) == null)
175                                                 return false;
176                                 }
177                         } while (!currEnum.Current.HasValue);
178
179                         curr = currEnum.Current.Value;
180
181                         return true;
182                 }
183
184                 public T Current {
185                         get {
186                                 return curr.Value;
187                         }
188                 }
189
190                 object IEnumerator.Current {
191                         get {
192                                 return curr.Value;
193                         }
194                 }
195                 
196                 public void Add (KeyValuePair<long, T> value, CancellationToken token)
197                 {
198                         slotBucket.Add (value);
199                 }
200                         
201                 // Called by each worker's endAction
202                 public void EndParticipation ()
203                 {
204                         slotBucket.EndParticipation ();
205                 }
206                 
207                 // Called at the end with ContinueAll
208                 public void Stop ()
209                 {
210                         slotBucket.Stop ();
211                 }
212         }
213 }
214 #endif