2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
authorJérémie Laval <jeremie.laval@gmail.com>
Wed, 19 Aug 2009 10:51:42 +0000 (10:51 -0000)
committerJérémie Laval <jeremie.laval@gmail.com>
Wed, 19 Aug 2009 10:51:42 +0000 (10:51 -0000)
In class/System/System.Collections.Concurrent/:
   * BlockingCollection.cs: Rewrite to use a transaction id
   based approach. Ported to 4.0 API. Fix GetConsumingEnumerable.

In class/System/Test/System.Collections.Concurrent/:
   * BlockingCollectionTests.cs: Track API changes.

In class/System/System.Threading/:
   * Barrier.cs: Fix Barrier to be really thread-safe.
   Remove deadlocking.

In class/corlib/System.Collections.Concurrent/:
   * Partitioner.cs: Fix infinite recursion when calling Create
   with a IList.
   * Partitionners/ListPartitioner.cs: Fix bad splitting for the
   last partition.

In class/corlib/System.Threading.Tasks/:
   * Task.cs: Refactor Wait methods.

In class/corlib/System.Threading/:
   * AtomicBoolean.cs: Turn it into a class
   * CountdownEvent.cs: Work on cached variable. Make sure
   count doesn't go under 0.
   * ParallelLoopState.cs: Initialize correctly AtomicBoolean.

svn path=/trunk/mcs/; revision=140229

15 files changed:
mcs/class/System/System.Collections.Concurrent/BlockingCollection.cs
mcs/class/System/System.Collections.Concurrent/ChangeLog
mcs/class/System/System.Threading/Barrier.cs
mcs/class/System/System.Threading/ChangeLog
mcs/class/System/Test/System.Collections.Concurrent/BlockingCollectionTests.cs
mcs/class/System/Test/System.Collections.Concurrent/ChangeLog [new file with mode: 0644]
mcs/class/corlib/System.Collections.Concurrent/ChangeLog
mcs/class/corlib/System.Collections.Concurrent/Partitioner.cs
mcs/class/corlib/System.Collections.Concurrent/Partitionners/ListPartitioner.cs
mcs/class/corlib/System.Threading.Tasks/ChangeLog
mcs/class/corlib/System.Threading.Tasks/Task.cs
mcs/class/corlib/System.Threading/AtomicBoolean.cs
mcs/class/corlib/System.Threading/ChangeLog
mcs/class/corlib/System.Threading/CountdownEvent.cs
mcs/class/corlib/System.Threading/ParallelLoopState.cs

index 8cfdf4568c70ae9d9b8dbaf02cb223f860cf4350..f3d1b4e724e64cfeaa86293e76babc4d1c9fa7fa 100644 (file)
@@ -35,11 +35,14 @@ namespace System.Collections.Concurrent
        {
                readonly IProducerConsumerCollection<T> underlyingColl;
                readonly int upperBound;
-               readonly Func<bool> isFull;
                
                readonly SpinWait sw = new SpinWait ();
                
                AtomicBoolean isComplete;
+               long completeId;
+
+               long addId = long.MinValue;
+               long removeId = long.MinValue;
                
                #region ctors
                public BlockingCollection ()
@@ -62,67 +65,124 @@ namespace System.Collections.Concurrent
                        this.underlyingColl = underlyingColl;
                        this.upperBound     = upperBound;
                        this.isComplete     = new AtomicBoolean ();
-                       
-                       if (upperBound == -1)
-                               isFull = FalseIsFull;
-                       else
-                               isFull = CountBasedIsFull;
                }
+               #endregion
                
-               static bool FalseIsFull ()
+               #region Add & Remove (+ Try)
+               public void Add (T item)
                {
-                       return false;
+                       Add (item, null);
                }
                
-               bool CountBasedIsFull ()
+               public void Add (T item, CancellationToken token)
                {
-                       return underlyingColl.Count >= upperBound;      
+                       Add (item, () => token.IsCancellationRequested);
                }
-               #endregion
                
-               #region Add & Remove (+ Try)
-               public void Add (T item)
+               void Add (T item, Func<bool> cancellationFunc)
                {
                        while (true) {
-                               while (isFull ()) {
-                                       if (isComplete.Value)
-                                               throw new InvalidOperationException ("The BlockingCollection<T>"
-                                                                                    + " has been marked as complete with regards to additions.");
-                                       Block ();
+                               long cachedAddId = addId;
+                               long cachedRemoveId = removeId;
+                               
+                               if (upperBound != -1) {
+                                       if (cachedAddId - cachedRemoveId > upperBound) {
+                                               Block ();
+                                               continue;
+                                       }
                                }
-                               // Extra check. The status might have changed after Block() or if isFull() is always false
-                               if (isComplete.Value)
+                               
+                               // Check our transaction id against completed stored one
+                               if (isComplete.Value && cachedAddId >= completeId)
                                        throw new InvalidOperationException ("The BlockingCollection<T> has"
                                                                             + " been marked as complete with regards to additions.");
-                               // Go back in main waiting loop
-                               if (isFull ())
-                                       continue;
                                
-                               if (underlyingColl.TryAdd (item))
+                               if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
                                        break;
+                               
+                               if (cancellationFunc != null && cancellationFunc ())
+                                       throw new OperationCanceledException ("CancellationToken triggered");
                        }
+                       
+                       
+                       if (!underlyingColl.TryAdd (item))
+                               throw new InvalidOperationException ("The underlying collection didn't accept the item.");
                }
                
-               public T Remove ()
+               public T Take ()
                {
-                       T item;
-                       
-                       while (underlyingColl.Count == 0 || !underlyingColl.TryTake (out item)) {
-                               if (isComplete.Value)
-                                       throw new OperationCanceledException ("The BlockingCollection<T> is empty and has been marked as complete with regards to additions.");
-                               Block ();
+                       return Take (null);
+               }
+               
+               public T Take (CancellationToken token)
+               {
+                       return Take (() => token.IsCancellationRequested);
+               }
+               
+               T Take (Func<bool> cancellationFunc)
+               {
+                       while (true) {
+                               long cachedRemoveId = removeId;
+                               long cachedAddId = addId;
+                               
+                               // Empty case
+                               if (cachedRemoveId == cachedAddId) {
+                                       if (isComplete.Value && cachedRemoveId >= completeId)
+                                               throw new OperationCanceledException ("The BlockingCollection<T> has"
+                                                                                     + " been marked as complete with regards to additions.");
+                                       
+                                       Block ();
+                                       continue;
+                               }
+                               
+                               if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
+                                       break;
+                               
+                               if (cancellationFunc != null && cancellationFunc ())
+                                       throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
                        }
                        
+                       T item;
+                       while (!underlyingColl.TryTake (out item));
+                       
                        return item;
                }
                
                public bool TryAdd (T item)
                {
-                       if (isComplete.Value || isFull ()) {
-                                       return false;
-                       }
+                       return TryAdd (item, null, null);
+               }
+               
+               bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
+               {
+                       do {
+                               if (token.HasValue && token.Value.IsCancellationRequested)
+                                       throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+                               
+                               long cachedAddId = addId;
+                               long cachedRemoveId = removeId;
+                               
+                               if (upperBound != -1) {
+                                       if (cachedAddId - cachedRemoveId > upperBound) {
+                                               continue;
+                                       }
+                               }
+                               
+                               // Check our transaction id against completed stored one
+                               if (isComplete.Value && cachedAddId >= completeId)
+                                       throw new InvalidOperationException ("The BlockingCollection<T> has"
+                                                                            + " been marked as complete with regards to additions.");
+                               
+                               if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
+                                       continue;
+                       
+                               if (!underlyingColl.TryAdd (item))
+                                       throw new InvalidOperationException ("The underlying collection didn't accept the item.");
+                               
+                               return true;
+                       } while (contFunc != null && contFunc ());
                        
-                       return underlyingColl.TryAdd (item);
+                       return false;
                }
                
                public bool TryAdd (T item, TimeSpan ts)
@@ -133,38 +193,67 @@ namespace System.Collections.Concurrent
                public bool TryAdd (T item, int millisecondsTimeout)
                {
                        Stopwatch sw = Stopwatch.StartNew ();
-                       while (isFull ()) {
-                               if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
-                                       sw.Stop ();
-                                       return false;
+                       return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
+               }
+               
+               public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
+               {
+                       Stopwatch sw = Stopwatch.StartNew ();
+                       return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
+               }
+               
+               public bool TryTake (out T item)
+               {
+                       return TryTake (out item, null, null);
+               }
+               
+               bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
+               {
+                       item = default (T);
+                       
+                       do {
+                               if (token.HasValue && token.Value.IsCancellationRequested)
+                                       throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+                               
+                               long cachedRemoveId = removeId;
+                               long cachedAddId = addId;
+                               
+                               // Empty case
+                               if (cachedRemoveId == cachedAddId) {
+                                       if (isComplete.Value && cachedRemoveId >= completeId)
+                                               continue;
+                                       
+                                       continue;
                                }
-                               Block ();
-                       }
-                       return TryAdd (item);
+                               
+                               if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
+                                       continue;
+                               
+                               return underlyingColl.TryTake (out item);
+                       } while (contFunc != null && contFunc ());
+                       
+                       return false;
                }
                
-               public bool TryRemove (out T item)
+               public bool TryTake (out T item, TimeSpan ts)
                {
-                       return underlyingColl.TryTake (out item);
+                       return TryTake (out item, (int)ts.TotalMilliseconds);
                }
                
-               public bool TryRemove (out T item, TimeSpan ts)
+               public bool TryTake (out T item, int millisecondsTimeout)
                {
-                       return TryRemove (out item, (int)ts.TotalMilliseconds);
+                       item = default (T);
+                       Stopwatch sw = Stopwatch.StartNew ();
+                       
+                       return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
                }
                
-               public bool TryRemove (out T item, int millisecondsTimeout)
+               public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
                {
+                       item = default (T);
                        Stopwatch sw = Stopwatch.StartNew ();
-                       while (underlyingColl.Count == 0) {
-                               if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
-                                       item = default (T);
-                                       return false;
-                               }
-                                       
-                               Block ();
-                       }
-                       return TryRemove (out item);
+                       
+                       return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
                }
                #endregion
                
@@ -185,7 +274,7 @@ namespace System.Collections.Concurrent
                        return false;
                }
                
-               public static int AddAny (BlockingCollection<T>[] collections, T item)
+               public static int AddToAny (BlockingCollection<T>[] collections, T item)
                {
                        CheckArray (collections);
                        int index = 0;
@@ -199,7 +288,21 @@ namespace System.Collections.Concurrent
                        return -1;
                }
                
-               public static int TryAddAny (BlockingCollection<T>[] collections, T item)
+               public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
+               {
+                       CheckArray (collections);
+                       int index = 0;
+                       foreach (var coll in collections) {
+                               try {
+                                       coll.Add (item, token);
+                                       return index;
+                               } catch {}
+                               index++;
+                       }
+                       return -1;
+               }
+               
+               public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
                {
                        CheckArray (collections);
                        int index = 0;
@@ -211,7 +314,7 @@ namespace System.Collections.Concurrent
                        return -1;
                }
                
-               public static int TryAddAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
+               public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
                {
                        CheckArray (collections);
                        int index = 0;
@@ -223,7 +326,7 @@ namespace System.Collections.Concurrent
                        return -1;
                }
                
-               public static int TryAddAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
+               public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
                {
                        CheckArray (collections);
                        int index = 0;
@@ -235,14 +338,42 @@ namespace System.Collections.Concurrent
                        return -1;
                }
                
-               public static int RemoveAny (BlockingCollection<T>[] collections, out T item)
+               public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
+                                              CancellationToken token)
+               {
+                       CheckArray (collections);
+                       int index = 0;
+                       foreach (var coll in collections) {
+                               if (coll.TryAdd (item, millisecondsTimeout, token))
+                                       return index;
+                               index++;
+                       }
+                       return -1;
+               }
+               
+               public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
+               {
+                       item = default (T);
+                       CheckArray (collections);
+                       int index = 0;
+                       foreach (var coll in collections) {
+                               try {
+                                       item = coll.Take ();
+                                       return index;
+                               } catch {}
+                               index++;
+                       }
+                       return -1;
+               }
+               
+               public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
                {
                        item = default (T);
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
                                try {
-                                       item = coll.Remove ();
+                                       item = coll.Take (token);
                                        return index;
                                } catch {}
                                index++;
@@ -250,42 +381,57 @@ namespace System.Collections.Concurrent
                        return -1;
                }
                
-               public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item)
+               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
+               {
+                       item = default (T);
+                       
+                       CheckArray (collections);
+                       int index = 0;
+                       foreach (var coll in collections) {
+                               if (coll.TryTake (out item))
+                                       return index;
+                               index++;
+                       }
+                       return -1;
+               }
+               
+               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
                {
                        item = default (T);
                        
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryRemove (out item))
+                               if (coll.TryTake (out item, ts))
                                        return index;
                                index++;
                        }
                        return -1;
                }
                
-               public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
+               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
                {
                        item = default (T);
                        
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryRemove (out item, ts))
+                               if (coll.TryTake (out item, millisecondsTimeout))
                                        return index;
                                index++;
                        }
                        return -1;
                }
                
-               public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
+               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
+                                                 CancellationToken token)
                {
                        item = default (T);
                        
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryRemove (out item, millisecondsTimeout))
+                               if (coll.TryTake (out item, millisecondsTimeout, token))
                                        return index;
                                index++;
                        }
@@ -295,7 +441,9 @@ namespace System.Collections.Concurrent
                
                public void CompleteAdding ()
                {
-                       isComplete.Value = true;
+                 // No further add beside that point
+                 completeId = addId;
+                 isComplete.Value = true;
                }
                
                void ICollection.CopyTo (Array array, int index)
@@ -310,8 +458,25 @@ namespace System.Collections.Concurrent
                
                public IEnumerable<T> GetConsumingEnumerable ()
                {
-                       T item;
-                       while (underlyingColl.TryTake (out item)) {
+                       return GetConsumingEnumerable (Take);
+               }
+               
+               public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
+               {
+                       return GetConsumingEnumerable (() => Take (token));
+               }
+               
+               IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
+               {
+                       while (true) {
+                               T item = default (T);
+                               
+                               try {
+                                       item = getFunc ();
+                               } catch {
+                                       break;
+                               }
+                               
                                yield return item;
                        }
                }
@@ -326,13 +491,14 @@ namespace System.Collections.Concurrent
                        return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
                }
                
-               public IEnumerator<T> GetEnumerator ()
+               public void Dispose ()
                {
-                       return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
+                       
                }
                
-               public void Dispose ()
+               protected virtual void Dispose (bool managedRes)
                {
+                       
                }
                
                public T[] ToArray ()
@@ -366,7 +532,7 @@ namespace System.Collections.Concurrent
                
                public bool IsCompleted {
                        get {
-                               return isComplete.Value && underlyingColl.Count == 0;
+                               return isComplete.Value && addId == removeId;
                        }
                }
                
index 3416a07521a2651b007bd6b7ce76bce1cb5f6c98..2de71e895620eaf37f36aab6d75dd4c2c0aedb12 100644 (file)
@@ -1,3 +1,8 @@
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * BlockingCollection.cs: Rewrite to use a transaction id
+       based approach. Ported to 4.0 API. Fix GetConsumingEnumerable.
+
 2009-08-11  Jérémie Laval  <jeremie.laval@gmail.com>
 
        * BlockingCollection.cs: New addition.
index 6642f0c35ea0a54deee3191ff649094201c7bbcb..105783e8f16daadad408baffb8cc4e2c74855449 100644 (file)
@@ -35,8 +35,9 @@ namespace System.Threading
                
                int participants;
                CountdownEvent cntd;
+               AtomicBoolean cleaned = new AtomicBoolean ();
                int phase;
-
+               
                public Barrier (int participants) : this (participants, null)
                {
                }
@@ -51,6 +52,7 @@ namespace System.Threading
                
                void InitCountdownEvent ()
                {
+                       cleaned = new AtomicBoolean ();
                        cntd = new CountdownEvent (participants);
                }
                
@@ -85,44 +87,69 @@ namespace System.Threading
                
                public void SignalAndWait ()
                {
-                       SignalAndWait (() => { cntd.Wait (); return true; });
+                       SignalAndWait ((c) => { c.Wait (); return true; });
                }
                
                public bool SignalAndWait (int millisecondTimeout)
                {
-                       return SignalAndWait (() => cntd.Wait (millisecondTimeout));
+                       return SignalAndWait ((c) => c.Wait (millisecondTimeout));
                }
                
                public bool SignalAndWait (TimeSpan ts)
                {
-                       return SignalAndWait (() => cntd.Wait (ts));
+                       return SignalAndWait ((c) => c.Wait (ts));
                }
                
                public bool SignalAndWait (int millisecondTimeout, CancellationToken token)
                {
-                       return SignalAndWait (() => cntd.Wait (millisecondTimeout, token));
+                       return SignalAndWait ((c) => c.Wait (millisecondTimeout, token));
                }
                
                public bool SignalAndWait (TimeSpan ts, CancellationToken token)
                {
-                       return SignalAndWait (() => cntd.Wait (ts, token));
+                       return SignalAndWait ((c) => c.Wait (ts, token));
                }
                
-               bool SignalAndWait (Func<bool> associate)
+               bool SignalAndWait (Func<CountdownEvent, bool> associate)
                {
                        bool result;
+                       AtomicBoolean cl = cleaned;
+                       CountdownEvent temp = cntd;
                        
-                       if (!cntd.Signal ()) {
-                               result = associate ();
+                       if (!temp.Signal ()) {
+                               result = Wait (associate, temp, cl);
                        } else {
                                result = true;
-                               postPhaseAction (this);
+                               PostPhaseAction (cl);
                                phase++;
                        }
                        
                        return result;
                }
                
+               bool Wait (Func<CountdownEvent, bool> associate, CountdownEvent temp, AtomicBoolean cl)
+               {
+                       if (!associate (temp))
+                               return false;
+                       
+                       SpinWait sw = new SpinWait ();
+                       while (!cl.Value) {
+                               //Console.WriteLine (cleaned);
+                               sw.SpinOnce ();
+                       }
+                       
+                       return true;
+               }
+               
+               void PostPhaseAction (AtomicBoolean cl)
+               {
+                       postPhaseAction (this);
+                       
+                       InitCountdownEvent ();
+                       
+                       cl.Value = true;
+               }
+               
                public int CurrentPhaseNumber {
                        get {
                                return phase;
index 381c8a5f137b17c417ef180e791c301231bf2b3e..8e816821de198fcaa422ea409c2cd85d98575009 100644 (file)
@@ -1,3 +1,8 @@
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * Barrier.cs: Fix Barrier to be really thread-safe.
+       Remove deadlocking.
+
 2009-08-11  Jérémie Laval  <jeremie.laval@gmail.com>
 
        * Barrier.cs: added.
index 068ec167fd0be161927defbf2ca263e05d2a18de..cfc942588e1f4489f02db6f80f8ce85361386a41 100644 (file)
@@ -79,16 +79,16 @@ namespace ParallelFxTests
                }
                
                [TestAttribute]
-               public void RemoveTestCase()
+               public void TakeTestCase()
                {
                        defaultCollection.Add(1);
                        defaultCollection.Add(2);
                        boundedCollection.Add(1);
                        boundedCollection.Add(2);
                        
-                       int value = defaultCollection.Remove();
+                       int value = defaultCollection.Take();
                        Assert.AreEqual(1, value, "#1");
-                       value = boundedCollection.Remove();
+                       value = boundedCollection.Take();
                        Assert.AreEqual(1, value, "#2");
                }
                
@@ -131,8 +131,8 @@ namespace ParallelFxTests
                        defaultCollection.CompleteAdding();
                        Assert.IsFalse(defaultCollection.IsCompleted, "#3");
                        
-                       defaultCollection.Remove();
-                       defaultCollection.Remove();
+                       defaultCollection.Take();
+                       defaultCollection.Take();
                        
                        Assert.IsTrue(defaultCollection.IsAddingCompleted, "#1");
                        Assert.AreEqual(0, defaultCollection.Count, "#2");
@@ -148,6 +148,7 @@ namespace ParallelFxTests
                        defaultCollection.Add(4);
                        defaultCollection.Add(5);
                        defaultCollection.Add(6);
+                       defaultCollection.CompleteAdding ();
                        
                        IEnumerable<int> enumerable = defaultCollection.GetConsumingEnumerable();
                        Assert.IsNotNull(enumerable, "#1");
diff --git a/mcs/class/System/Test/System.Collections.Concurrent/ChangeLog b/mcs/class/System/Test/System.Collections.Concurrent/ChangeLog
new file mode 100644 (file)
index 0000000..8138790
--- /dev/null
@@ -0,0 +1,4 @@
+2009-08-19 Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * BlockingCollectionTests.cs: track API changes
+
index 83bf3172fef528362363c4addc08e6eae253141b..a1f5d2223c124de87cb6548861e7fe97f23c379f 100644 (file)
@@ -1,3 +1,10 @@
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * Partitioner.cs: Fix infinite recursion when calling Create
+       with a IList.
+       * Partitionners/ListPartitioner.cs: Fix bad splitting for the
+       last partition.
+
 2009-08-11  Jérémie Laval  <jeremie.laval@gmail.com>
 
        * ConcurrentQueue.cs:
index c82398ac0799e30d6b2bbcb53838f1263dbee95e..b6655578d2d1fc865f8edcecafb200951a1b5121 100644 (file)
@@ -36,14 +36,14 @@ namespace System.Collections.Concurrent
                {
                        IList<TSource> tempIList = source as IList<TSource>;
                        if (tempIList != null)
-                               return Create (tempIList);
+                               return Create (tempIList, true);
                        
                        return new EnumerablePartitioner<TSource> (source);
                }
                
          public static OrderablePartitioner<TSource> Create<TSource> (TSource[] source, bool loadBalance)
                {
-                       return Create ((IList<TSource>)source);
+                       return Create ((IList<TSource>)source, loadBalance);
                }
                
          public static OrderablePartitioner<TSource> Create<TSource> (IList<TSource> source, bool loadBalance)
index 527df2d23d9c176c7dcff479d419f39eaacf61bf..37b0fdddfc2ae98e2ce96f4cbee572974b2b1a9d 100644 (file)
@@ -53,18 +53,18 @@ namespace System.Collections.Concurrent
                                if (i != enumerators.Length - 1)
                                        enumerators[i] = GetEnumeratorForRange (i * count, i * count + count);
                                else
-                                       enumerators[i] = GetEnumeratorForRange (i * count, source.Count - i * count);
+                                       enumerators[i] = GetEnumeratorForRange (i * count, source.Count);
                        }
                        
                        return enumerators;
                }
                
-               IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int count)
+               IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int lastIndex)
                {
                        if (startIndex >= source.Count)
                          return GetEmpty ();
                        
-                       return GetEnumeratorForRangeInternal (startIndex, count);
+                       return GetEnumeratorForRangeInternal (startIndex, lastIndex);
                }
 
                IEnumerator<KeyValuePair<long, T>> GetEmpty ()
@@ -72,9 +72,9 @@ namespace System.Collections.Concurrent
                        yield break;
                  }
                
-               IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int count)
+               IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int lastIndex)
                {       
-                       for (int i = startIndex; i < count; i++) {
+                       for (int i = startIndex; i < lastIndex; i++) {
                                yield return new KeyValuePair<long, T> (i, source[i]);
                        }
                }
index 0fc4ae7a11ce8bf940a5e400fee5eb29a568a46b..b3783dfea02a80643b25d194080a1fc39eb37ebc 100644 (file)
@@ -1,3 +1,7 @@
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * Task.cs: Refactor Wait methods.
+
 2009-08-11  Jérémie Laval  <jeremie.laval@gmail.com>
 
        * Future.cs: Add static to Factory property
index 750a22bef0822e55030b1c783aaa2acd7f45bd40..cf71851f3e9bab0a53d27a3bbb3dbe358fbd014b 100644 (file)
@@ -201,7 +201,7 @@ namespace System.Threading.Tasks
                }
                
                internal void ContinueWithCore (Task continuation, TaskContinuationOptions kind,
-                                                         TaskScheduler scheduler, Func<bool> predicate)
+                                               TaskScheduler scheduler, Func<bool> predicate)
                {
                        // Already set the scheduler so that user can call Wait and that sort of stuff
                        continuation.taskScheduler = scheduler;
@@ -455,22 +455,10 @@ namespace System.Threading.Tasks
                        if (exception != null && !(exception is TaskCanceledException))
                                throw exception;
                }
-               
-               [MonoTODO ("Refactor")]
+
                public void Wait (CancellationToken token)
                {
-                       if (scheduler == null)
-                               throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
-                       
-                       Watch sw = Watch.StartNew ();
-                       scheduler.ParticipateUntil (this, delegate { 
-                               return token.IsCancellationRequested;
-                       });
-                       sw.Stop ();
-                       
-                       if (exception != null && !(exception is TaskCanceledException))
-                               throw exception;
-                       
+                       Wait (null, token);
                }
                
                public bool Wait (TimeSpan ts)
@@ -480,33 +468,29 @@ namespace System.Threading.Tasks
                
                public bool Wait (int millisecondsTimeout)
                {
-                       if (scheduler == null)
-                               throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
-                       
                        Watch sw = Watch.StartNew ();
-                       bool result = scheduler.ParticipateUntil (this, delegate { 
-                               return sw.ElapsedMilliseconds >= millisecondsTimeout;
-                       });
-                       sw.Stop ();
-                       
-                       if (exception != null && !(exception is TaskCanceledException))
-                               throw exception;
-                       
-                       return !result;
+                       return Wait (() => sw.ElapsedMilliseconds >= millisecondsTimeout, null);
                }
                
-               [MonoTODO ("Refactor")]
                public bool Wait (int millisecondsTimeout, CancellationToken token)
+               {
+                       Watch sw = Watch.StartNew ();
+                       return Wait (() => sw.ElapsedMilliseconds >= millisecondsTimeout, token);
+               }
+
+               bool Wait (Func<bool> stopFunc, CancellationToken? token)
                {
                        if (scheduler == null)
                                throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
                        
-                       Watch sw = Watch.StartNew ();
                        bool result = scheduler.ParticipateUntil (this, delegate { 
-                               return sw.ElapsedMilliseconds >= millisecondsTimeout || token.IsCancellationRequested;
+                               if (token.HasValue && token.Value.IsCancellationRequested)
+                                       throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+                               
+                               
+                               return (stopFunc != null) ? stopFunc () : false;
                        });
-                       sw.Stop ();
-                       
+
                        if (exception != null && !(exception is TaskCanceledException))
                                throw exception;
                        
@@ -575,6 +559,11 @@ namespace System.Threading.Tasks
                }
                
                public static int WaitAny (params Task[] tasks)
+               {
+                       return WaitAny (tasks, null, null);
+               }
+               
+               static int WaitAny (Task[] tasks, Func<bool> stopFunc, CancellationToken? token)
                {
                        if (tasks == null)
                                throw new ArgumentNullException ("tasks");
@@ -602,6 +591,12 @@ namespace System.Threading.Tasks
                        
                        // All tasks are supposed to use the same TaskManager
                        tasks[0].scheduler.ParticipateUntil (delegate {
+                               if (stopFunc != null && stopFunc ())
+                                       return true;
+                               
+                               if (token.HasValue && token.Value.IsCancellationRequested)
+                                       throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+                               
                                return numFinished >= 1;
                        });
                        
@@ -617,107 +612,29 @@ namespace System.Threading.Tasks
                {
                        if (millisecondsTimeout < -1)
                                throw new ArgumentOutOfRangeException ("millisecondsTimeout");
-                       if (tasks == null)
-                               throw new ArgumentNullException ("tasks");
                        
                        if (millisecondsTimeout == -1)
                                return WaitAny (tasks);
                        
-                       int numFinished = 0;
-                       int indexFirstFinished = -1;
-                       
-                       foreach (Task t in tasks) {
-                               t.completed += delegate (object sender, EventArgs e) { 
-                                       int result = Interlocked.Increment (ref numFinished);
-                                       if (result == 1) {
-                                               Task target = (Task)sender;
-                                               indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
-                                       }
-                               };      
-                       }
-                       
                        Watch sw = Watch.StartNew ();
-                       tasks[0].scheduler.ParticipateUntil (delegate {
-                               if (sw.ElapsedMilliseconds > millisecondsTimeout)
-                                       return true;
-                               return numFinished >= 1;
-                       });
-                       sw.Stop ();
-                       
-                       return indexFirstFinished;
+                       return WaitAny (tasks, () => sw.ElapsedMilliseconds > millisecondsTimeout, null);
                }
-               
-               [MonoTODO ("Refactor")]
+
                public static int WaitAny (Task[] tasks, int millisecondsTimeout, CancellationToken token)
-               {
+               {                       
                        if (millisecondsTimeout < -1)
                                throw new ArgumentOutOfRangeException ("millisecondsTimeout");
-                       if (tasks == null)
-                               throw new ArgumentNullException ("tasks");
                        
                        if (millisecondsTimeout == -1)
                                return WaitAny (tasks);
                        
-                       int numFinished = 0;
-                       int indexFirstFinished = -1;
-                       
-                       foreach (Task t in tasks) {
-                               t.completed += delegate (object sender, EventArgs e) { 
-                                       int result = Interlocked.Increment (ref numFinished);
-                                       if (result == 1) {
-                                               Task target = (Task)sender;
-                                               indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
-                                       }
-                               };      
-                       }
-                       
                        Watch sw = Watch.StartNew ();
-                       tasks[0].scheduler.ParticipateUntil (delegate {
-                               if (sw.ElapsedMilliseconds > millisecondsTimeout || token.IsCancellationRequested)
-                                       return true;
-                               return numFinished >= 1;
-                       });
-                       sw.Stop ();
-                       
-                       return indexFirstFinished;
+                       return WaitAny (tasks, () => sw.ElapsedMilliseconds > millisecondsTimeout, token);
                }
                
-               [MonoTODO ("Refactor")]
                public static int WaitAny (Task[] tasks, CancellationToken token)
-               {
-                       if (tasks == null)
-                               throw new ArgumentNullException ("tasks");
-                       if (tasks.Length == 0)
-                               throw new ArgumentException ("tasks is empty", "tasks");
-                       
-                       int numFinished = 0;
-                       int indexFirstFinished = -1;
-                       int index = 0;
-                       
-                       foreach (Task t in tasks) {
-                               if (t.IsCompleted) {
-                                       return index;
-                               }
-                               t.completed += delegate (object sender, EventArgs e) {
-                                       int result = Interlocked.Increment (ref numFinished);
-                                       // Check if we are the first to have finished
-                                       if (result == 1) {
-                                               Task target = (Task)sender;
-                                               indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
-                                       }
-                               };      
-                               index++;
-                       }
-                       
-                       // All tasks are supposed to use the same TaskManager
-                       tasks[0].scheduler.ParticipateUntil (delegate {
-                               if (token.IsCancellationRequested)
-                                       return true;
-                               
-                               return numFinished >= 1;
-                       });
-                       
-                       return indexFirstFinished;
+               {                       
+                       return WaitAny (tasks, null, token);
                }
                #endregion
                
index 4195fca3a55fef46fd03e49d7ce74903b006c1c7..c91bde8ec16f5650bac2bc459ddbc16ff5cfc9f7 100644 (file)
@@ -27,7 +27,7 @@ using System;
 
 namespace System.Threading
 {
-       internal struct AtomicBoolean
+       internal class AtomicBoolean
        {
                int flag;
                const int UnSet = 0;
@@ -49,6 +49,11 @@ namespace System.Threading
                        return temp;
                }
                
+               public bool TrySet ()
+               {
+                       return !Exchange (true);
+               }
+               
                public bool Exchange (bool newVal)
                {
                        int newTemp = newVal ? Set : UnSet;
index 38ed16bd4d31ab7128c31434f971f0b047a9ca8a..c4277fee3e3333b58f125facb42c462d9ee9225a 100644 (file)
@@ -1,3 +1,14 @@
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * ParallelLoopState.cs: Take in account that
+       AtomicBoolean is a class.
+
+2009-08-19  Jérémie Laval  <jeremie.laval@gmail.com>
+
+       * AtomicBoolean.cs: Turn it into a class
+       * CountdownEvent.cs: Work on cached variable. Make sure
+       count doesn't go under 0.
+
 2009-08-11  Jérémie Laval  <jeremie.laval@gmail.com>
 
        * Watch.cs:
index e61d096aacb41131d57a0ca9e5498b7889a1f3de..0a53f30e5244c7124dcae24597c67345aba019a3 100644 (file)
@@ -47,7 +47,7 @@ namespace System.Threading
                
                public bool Signal (int num)
                {
-                       if (num < 0)
+                       if (num <= 0)
                                throw new ArgumentOutOfRangeException ("num");
                        
                        Action<int> check = delegate (int value) {
@@ -59,7 +59,7 @@ namespace System.Threading
                        if (!ApplyOperation (-num, check, out newValue))
                                throw new InvalidOperationException ("The event is already set");
                        
-                       if (newValue <= 0) {
+                       if (newValue == 0) {
                                evt.Set ();
                                return true;
                        }
@@ -91,9 +91,7 @@ namespace System.Threading
                        if (num < 0)
                                throw new ArgumentOutOfRangeException ("num");
                        
-                       Action<int> check = delegate (int value) {      };
-                       
-                       return ApplyOperation (num, check);
+                       return ApplyOperation (num, null);
                }
                
                bool ApplyOperation (int num, Action<int> doCheck)
@@ -108,14 +106,14 @@ namespace System.Threading
                        newValue = 0;
                        
                        do {
-                               if (IsSet)
+                               oldCount = count;
+                               if (oldCount == 0)
                                        return false;
                                
-                               oldCount = count;
                                newValue = oldCount + num;
                                
-                               doCheck (newValue);
-                               
+                               if (doCheck != null)
+                                       doCheck (newValue);
                        } while (Interlocked.CompareExchange (ref count, newValue, oldCount) != oldCount);
                        
                        return true;
@@ -214,7 +212,7 @@ namespace System.Threading
                        
                public bool IsSet {
                        get {
-                               return count <= 0;
+                               return count == 0;
                        }
                }
                
index 8231aaa9fb93c8ba850919149189895c90556ad3..da75feade4a20de2911ba2d4edb109213b2af0d4 100644 (file)
@@ -32,8 +32,8 @@ namespace System.Threading
        {
                internal class ExternalInfos
                {
-                       public AtomicBoolean IsStopped;
-                       public AtomicBoolean IsBroken;
+                       public AtomicBoolean IsStopped = new AtomicBoolean ();
+                       public AtomicBoolean IsBroken = new AtomicBoolean ();
                        public volatile bool IsExceptional;
                        public long? LowestBreakIteration;
                }