2 // OrderingEnumerator.cs
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
7 // Copyright (c) 2010 Jérémie "Garuma" Laval
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Collections.Concurrent;
34 namespace System.Linq.Parallel
36 internal class OrderingEnumerator<T> : IEnumerator<T>
38 internal class SlotBucket
40 ConcurrentDictionary<long, T> temporaryArea = new ConcurrentDictionary<long, T> ();
41 KeyValuePair<long, T>?[] stagingArea;
45 CountdownEvent stagingCount;
46 CountdownEvent participantCount;
47 CancellationTokenSource src = new CancellationTokenSource ();
48 CancellationToken mergedToken;
50 public SlotBucket (int count, CancellationToken token)
53 stagingCount = new CountdownEvent (count);
54 participantCount = new CountdownEvent (count);
55 stagingArea = new KeyValuePair<long, T>?[count];
56 currentIndex = -count;
57 mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
60 public void Add (KeyValuePair<long, T> value)
62 long index = value.Key;
64 if (index >= currentIndex && index < currentIndex + count) {
65 stagingArea[index % count] = value;
66 stagingCount.Signal ();
68 temporaryArea.TryAdd (index, value.Value);
69 if (index >= currentIndex && index < currentIndex + count) {
71 if (temporaryArea.TryRemove (index, out dummy)) {
72 stagingArea[index % count] = value;
73 stagingCount.Signal ();
79 // Called by each worker's endAction
80 public void EndParticipation ()
82 if (participantCount.Signal ())
86 // Called at the end with ContinueAll
94 for (int i = 0; i < count; i++) {
96 int index = i + (int)currentIndex;
98 if (stagingArea[index % count].HasValue)
101 if (!temporaryArea.TryRemove (index, out temp))
104 stagingArea [index % count] = new KeyValuePair<long, T> (index, temp);
105 if (stagingCount.Signal ())
112 for (int i = 0; i < stagingArea.Length; i++)
113 stagingArea[i] = new Nullable<KeyValuePair<long, T>> ();
116 public IEnumerator<KeyValuePair<long, T>?> Wait ()
119 stagingCount.Reset ();
121 Interlocked.Add (ref currentIndex, count);
125 while (!stagingCount.IsSet) {
126 if (!participantCount.IsSet)
128 stagingCount.Wait (mergedToken);
133 if (participantCount.IsSet) {
135 if (stagingArea[0].HasValue)
142 return ((IEnumerable<KeyValuePair<long, T>?>)stagingArea).GetEnumerator ();
147 SlotBucket slotBucket;
149 IEnumerator<KeyValuePair<long, T>?> currEnum;
150 KeyValuePair<long, T> curr;
152 internal OrderingEnumerator (int num, CancellationToken token)
155 slotBucket = new SlotBucket (num, token);
158 public void Dispose ()
168 public bool MoveNext ()
171 while (currEnum == null || !currEnum.MoveNext ()) {
172 if (currEnum != null)
174 if ((currEnum = slotBucket.Wait ()) == null)
177 } while (!currEnum.Current.HasValue);
179 curr = currEnum.Current.Value;
190 object IEnumerator.Current {
196 public void Add (KeyValuePair<long, T> value, CancellationToken token)
198 slotBucket.Add (value);
201 // Called by each worker's endAction
202 public void EndParticipation ()
204 slotBucket.EndParticipation ();
207 // Called at the end with ContinueAll