Merge pull request #408 from strawd/master
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
index 8c098c5d4faf5df7d58f7b6acfdf891da0fd6f1c..41a241b10e20dc12a96e75c6c3c8faf43ebb44e3 100644 (file)
@@ -23,7 +23,7 @@
 //
 //
 
-#if NET_4_0 || BOOTSTRAP_NET_4_0
+#if NET_4_0
 
 using System;
 using System.Threading;
@@ -39,7 +39,6 @@ namespace System.Collections.Concurrent
        [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
        public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
        {
-               const int sleepTime = 50;
                const int spinCount = 5;
 
                readonly IProducerConsumerCollection<T> underlyingColl;
@@ -48,32 +47,46 @@ namespace System.Collections.Concurrent
                AtomicBoolean isComplete;
                long completeId;
 
+               /* The whole idea of the collection is to use these two long values in a transactional
+                * way to track and manage the actual data inside the underlying lock-free collection
+                * instead of directly working with it or using external locking.
+                *
+                * They are manipulated with CAS and are guaranteed to increase over time and use
+                * of the instance thus preventing ABA problems.
+                */
                long addId = long.MinValue;
                long removeId = long.MinValue;
 
+               /* These events are used solely for the purpose of having an optimized sleep cycle when
+                * the BlockingCollection have to wait on an external event (Add or Remove for instance)
+                */
                ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
                ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
 
+               /* For time based operations, we share this instance of Stopwatch and base calculation
+                  on a time offset at each of these method call */
+               static Stopwatch watch = new Stopwatch ();
+
                #region ctors
                public BlockingCollection ()
                        : this (new ConcurrentQueue<T> (), -1)
                {
                }
 
-               public BlockingCollection (int upperBound)
-                       : this (new ConcurrentQueue<T> (), upperBound)
+               public BlockingCollection (int boundedCapacity)
+                       : this (new ConcurrentQueue<T> (), boundedCapacity)
                {
                }
 
-               public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
-                       : this (underlyingColl, -1)
+               public BlockingCollection (IProducerConsumerCollection<T> collection)
+                       : this (collection, -1)
                {
                }
 
-               public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
+               public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
                {
-                       this.underlyingColl = underlyingColl;
-                       this.upperBound     = upperBound;
+                       this.underlyingColl = collection;
+                       this.upperBound     = boundedCapacity;
                        this.isComplete     = new AtomicBoolean ();
                }
                #endregion
@@ -84,192 +97,175 @@ namespace System.Collections.Concurrent
                        Add (item, CancellationToken.None);
                }
 
-               public void Add (T item, CancellationToken token)
+               public void Add (T item, CancellationToken cancellationToken)
                {
-                       SpinWait sw = new SpinWait ();
-
-                       while (true) {
-                               token.ThrowIfCancellationRequested ();
-
-                               long cachedAddId = addId;
-                               long cachedRemoveId = removeId;
-
-                               if (upperBound != -1) {
-                                       if (cachedAddId - cachedRemoveId > upperBound) {
-                                               if (sw.Count <= spinCount)
-                                                       sw.SpinOnce ();
-                                               else if (mreRemove.Wait (sleepTime))
-                                                       mreRemove.Reset ();
-
-                                               continue;
-                                       }
-                               }
-
-                               // Check our transaction id against completed stored one
-                               if (isComplete.Value && cachedAddId >= completeId)
-                                       throw new InvalidOperationException ("The BlockingCollection<T> has"
-                                                                            + " been marked as complete with regards to additions.");
-
-                               if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
-                                       break;
-                       }
-
-
-                       if (!underlyingColl.TryAdd (item))
-                               throw new InvalidOperationException ("The underlying collection didn't accept the item.");
-
-                       if (!mreAdd.IsSet)
-                               mreAdd.Set ();
+                       TryAdd (item, -1, cancellationToken);
                }
 
-               public T Take ()
+               public bool TryAdd (T item)
                {
-                       return Take (CancellationToken.None);
+                       return TryAdd (item, 0, CancellationToken.None);
                }
 
-               public T Take (CancellationToken token)
+               public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
                {
-                       SpinWait sw = new SpinWait ();
-
-                       while (true) {
-                               token.ThrowIfCancellationRequested ();
-
-                               long cachedRemoveId = removeId;
-                               long cachedAddId = addId;
-
-                               // Empty case
-                               if (cachedRemoveId == cachedAddId) {
-                                       if (IsCompleted)
-                                               throw new InvalidOperationException ("The BlockingCollection<T> has"
-                                                                                     + " been marked as complete with regards to additions.");
-                                       if (sw.Count <= spinCount)
-                                               sw.SpinOnce ();
-                                       else if (mreAdd.Wait (sleepTime))
-                                               mreAdd.Reset ();
-
-                                       continue;
-                               }
-
-                               if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
-                                       break;
-                       }
-
-                       T item;
-                       while (!underlyingColl.TryTake (out item));
-
-                       if (!mreRemove.IsSet)
-                               mreRemove.Set ();
-
-                       return item;
-               }
+                       if (millisecondsTimeout < -1)
+                               throw new ArgumentOutOfRangeException ("millisecondsTimeout");
 
-               public bool TryAdd (T item)
-               {
-                       return TryAdd (item, null, CancellationToken.None);
-               }
+                       long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
+                       SpinWait sw = new SpinWait ();
 
-               bool TryAdd (T item, Func<bool> contFunc, CancellationToken token)
-               {
                        do {
-                               token.ThrowIfCancellationRequested ();
+                               cancellationToken.ThrowIfCancellationRequested ();
 
                                long cachedAddId = addId;
                                long cachedRemoveId = removeId;
 
-                               if (upperBound != -1) {
-                                       if (cachedAddId - cachedRemoveId > upperBound) {
-                                               continue;
+                               // If needed, we check and wait that the collection isn't full
+                               if (upperBound != -1 && cachedAddId - cachedRemoveId > upperBound) {
+                                       if (millisecondsTimeout == 0)
+                                               return false;
+
+                                       if (sw.Count <= spinCount) {
+                                               sw.SpinOnce ();
+                                       } else {
+                                               mreRemove.Reset ();
+                                               if (cachedRemoveId != removeId || cachedAddId != addId) {
+                                                       mreRemove.Set ();
+                                                       continue;
+                                               }
+
+                                               mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
                                        }
+
+                                       continue;
                                }
 
                                // Check our transaction id against completed stored one
                                if (isComplete.Value && cachedAddId >= completeId)
-                                       throw new InvalidOperationException ("The BlockingCollection<T> has"
-                                                                            + " been marked as complete with regards to additions.");
+                                       ThrowCompleteException ();
 
+                               // Validate the steps we have been doing until now
                                if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
                                        continue;
 
+                               // We have a slot reserved in the underlying collection, try to take it
                                if (!underlyingColl.TryAdd (item))
-                                       continue;
+                                       throw new InvalidOperationException ("The underlying collection didn't accept the item.");
 
-                               if (!mreAdd.IsSet)
-                                       mreAdd.Set ();
+                               // Wake up process that may have been sleeping
+                               mreAdd.Set ();
 
                                return true;
-                       } while (contFunc != null && contFunc ());
+                       } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
 
                        return false;
                }
 
-               public bool TryAdd (T item, TimeSpan ts)
+               public bool TryAdd (T item, TimeSpan timeout)
                {
-                       return TryAdd (item, (int)ts.TotalMilliseconds);
+                       return TryAdd (item, (int)timeout.TotalMilliseconds);
                }
 
                public bool TryAdd (T item, int millisecondsTimeout)
                {
-                       Stopwatch sw = Stopwatch.StartNew ();
-                       return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
+                       return TryAdd (item, millisecondsTimeout, CancellationToken.None);
                }
 
-               public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
+               public T Take ()
+               {
+                       return Take (CancellationToken.None);
+               }
+
+               public T Take (CancellationToken cancellationToken)
                {
-                       Stopwatch sw = Stopwatch.StartNew ();
-                       return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
+                       T item;
+                       TryTake (out item, -1, cancellationToken, true);
+
+                       return item;
                }
 
                public bool TryTake (out T item)
                {
-                       return TryTake (out item, null, CancellationToken.None);
+                       return TryTake (out item, 0, CancellationToken.None);
+               }
+
+               public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
+               {
+                       return TryTake (out item, millisecondsTimeout, cancellationToken, false);
                }
 
-               bool TryTake (out T item, Func<bool> contFunc, CancellationToken token)
+               bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
                {
+                       if (milliseconds < -1)
+                               throw new ArgumentOutOfRangeException ("milliseconds");
+
                        item = default (T);
+                       SpinWait sw = new SpinWait ();
+                       long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
 
                        do {
-                               token.ThrowIfCancellationRequested ();
+                               cancellationToken.ThrowIfCancellationRequested ();
 
                                long cachedRemoveId = removeId;
                                long cachedAddId = addId;
 
                                // Empty case
                                if (cachedRemoveId == cachedAddId) {
-                                       if (IsCompleted)
+                                       if (milliseconds == 0)
                                                return false;
 
+                                       if (IsCompleted) {
+                                               if (throwComplete)
+                                                       ThrowCompleteException ();
+                                               else
+                                                       return false;
+                                       }
+
+                                       if (sw.Count <= spinCount) {
+                                               sw.SpinOnce ();
+                                       } else {
+                                               mreAdd.Reset ();
+                                               if (cachedRemoveId != removeId || cachedAddId != addId) {
+                                                       mreAdd.Set ();
+                                                       continue;
+                                               }
+
+                                               mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
+                                       }
+
                                        continue;
                                }
 
                                if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
                                        continue;
 
-                               return underlyingColl.TryTake (out item);
-                       } while (contFunc != null && contFunc ());
+                               while (!underlyingColl.TryTake (out item));
+
+                               mreRemove.Set ();
+
+                               return true;
+
+                       } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
 
                        return false;
                }
 
-               public bool TryTake (out T item, TimeSpan ts)
+               public bool TryTake (out T item, TimeSpan timeout)
                {
-                       return TryTake (out item, (int)ts.TotalMilliseconds);
+                       return TryTake (out item, (int)timeout.TotalMilliseconds);
                }
 
                public bool TryTake (out T item, int millisecondsTimeout)
                {
                        item = default (T);
-                       Stopwatch sw = Stopwatch.StartNew ();
 
-                       return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
+                       return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
                }
 
-               public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
+               static int ComputeTimeout (int millisecondsTimeout, long start)
                {
-                       item = default (T);
-                       Stopwatch sw = Stopwatch.StartNew ();
-
-                       return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
+                       return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
                }
                #endregion
 
@@ -304,13 +300,13 @@ namespace System.Collections.Concurrent
                        return -1;
                }
 
-               public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
+               public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
                {
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
                                try {
-                                       coll.Add (item, token);
+                                       coll.Add (item, cancellationToken);
                                        return index;
                                } catch {}
                                index++;
@@ -330,12 +326,12 @@ namespace System.Collections.Concurrent
                        return -1;
                }
 
-               public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
+               public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
                {
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryAdd (item, ts))
+                               if (coll.TryAdd (item, timeout))
                                        return index;
                                index++;
                        }
@@ -355,12 +351,12 @@ namespace System.Collections.Concurrent
                }
 
                public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
-                                              CancellationToken token)
+                                              CancellationToken cancellationToken)
                {
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryAdd (item, millisecondsTimeout, token))
+                               if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
                                        return index;
                                index++;
                        }
@@ -382,14 +378,14 @@ namespace System.Collections.Concurrent
                        return -1;
                }
 
-               public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
+               public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
                {
                        item = default (T);
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
                                try {
-                                       item = coll.Take (token);
+                                       item = coll.Take (cancellationToken);
                                        return index;
                                } catch {}
                                index++;
@@ -411,14 +407,14 @@ namespace System.Collections.Concurrent
                        return -1;
                }
 
-               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
+               public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
                {
                        item = default (T);
 
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryTake (out item, ts))
+                               if (coll.TryTake (out item, timeout))
                                        return index;
                                index++;
                        }
@@ -440,14 +436,14 @@ namespace System.Collections.Concurrent
                }
 
                public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
-                                                 CancellationToken token)
+                                                 CancellationToken cancellationToken)
                {
                        item = default (T);
 
                        CheckArray (collections);
                        int index = 0;
                        foreach (var coll in collections) {
-                               if (coll.TryTake (out item, millisecondsTimeout, token))
+                               if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
                                        return index;
                                index++;
                        }
@@ -457,9 +453,18 @@ namespace System.Collections.Concurrent
 
                public void CompleteAdding ()
                {
-                 // No further add beside that point
-                 completeId = addId;
-                 isComplete.Value = true;
+                       // No further add beside that point
+                       completeId = addId;
+                       isComplete.Value = true;
+                       // Wakeup some operation in case this has an impact
+                       mreAdd.Set ();
+                       mreRemove.Set ();
+               }
+
+               void ThrowCompleteException ()
+               {
+                       throw new InvalidOperationException ("The BlockingCollection<T> has"
+                                                            + " been marked as complete with regards to additions.");
                }
 
                void ICollection.CopyTo (Array array, int index)
@@ -474,23 +479,22 @@ namespace System.Collections.Concurrent
 
                public IEnumerable<T> GetConsumingEnumerable ()
                {
-                       return GetConsumingEnumerable (Take);
+                       return GetConsumingEnumerable (CancellationToken.None);
                }
 
-               public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
-               {
-                       return GetConsumingEnumerable (() => Take (token));
-               }
-
-               IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
+               public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
                {
                        while (true) {
                                T item = default (T);
 
                                try {
-                                       item = getFunc ();
+                                       item = Take (cancellationToken);
                                } catch {
-                                       break;
+                                       // Then the exception is perfectly normal
+                                       if (IsCompleted)
+                                               break;
+                                       // otherwise rethrow
+                                       throw;
                                }
 
                                yield return item;
@@ -512,7 +516,7 @@ namespace System.Collections.Concurrent
 
                }
 
-               protected virtual void Dispose (bool managedRes)
+               protected virtual void Dispose (bool disposing)
                {
 
                }