Merge pull request #495 from nicolas-raoul/fix-for-issue2907-with-no-formatting-changes
[mono.git] / mcs / class / System / System.Collections.Concurrent / ConcurrentBag.cs
index a95f58dce4f5e4f4a17e98b0b06f2fb4553b68cc..233662a8d7701ce90e49416083f9add271a4fe05 100644 (file)
@@ -24,7 +24,7 @@
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
 
-#if NET_4_0
+#if NET_4_0 || MOBILE
 using System;
 using System.Collections;
 using System.Collections.Generic;
@@ -41,86 +41,134 @@ namespace System.Collections.Concurrent
        [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
        public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IEnumerable<T>, IEnumerable
        {
-               const int multiplier = 2;
-               const int hintThreshold = 20;
-               
-               int size = Environment.ProcessorCount + 1;
+               // We store hints in a long
+               long hints;
+
                int count;
-               
-               // We only use the add hints when number of slot is above hintThreshold
-               // so to not waste memory space and the CAS overhead
-               ConcurrentQueue<int> addHints = new ConcurrentQueue<int> ();
-               
-               CyclicDeque<T>[] container;
-               
-               object syncLock = new object ();
+               // The container area is where bag are added foreach thread
+               ConcurrentDictionary<int, CyclicDeque<T>> container = new ConcurrentDictionary<int, CyclicDeque<T>> ();
+               // The staging area is where non-empty bag are located for fast iteration
+               ConcurrentDictionary<int, CyclicDeque<T>> staging = new ConcurrentDictionary<int, CyclicDeque<T>> ();
                
                public ConcurrentBag ()
                {
-                       container = new CyclicDeque<T>[size];
-                       for (int i = 0; i < container.Length; i++)
-                               container[i] = new CyclicDeque<T> ();
                }
                
-               public ConcurrentBag (IEnumerable<T> enumerable) : this ()
+               public ConcurrentBag (IEnumerable<T> collection) : this ()
                {
-                       foreach (T item in enumerable)
+                       foreach (T item in collection)
                                Add (item);
                }
                
-               public bool TryAdd (T item)
-               {
-                       Add (item);
-                       
-                       return true;
-               }
-               
                public void Add (T item)
                {
-                       Interlocked.Increment (ref count);
-                       GrowIfNecessary ();
-                       
                        int index;
                        CyclicDeque<T> bag = GetBag (out index);
                        bag.PushBottom (item);
-                       
-                       // Cache operation ?
-                       if (size > hintThreshold)
-                               addHints.Enqueue (index);
+                       AddHint (index);
+                       Interlocked.Increment (ref count);
+               }
+
+               bool IProducerConsumerCollection<T>.TryAdd (T element)
+               {
+                       Add (element);
+                       return true;
                }
                
-               public bool TryTake (out T item)
+               public bool TryTake (out T result)
                {
-                       item = default (T);
+                       result = default (T);
 
                        if (count == 0)
                                return false;
 
                        int hintIndex;
-                       CyclicDeque<T> bag = GetBag (out hintIndex);
-                       bool hintEnabled = size > hintThreshold;
+                       CyclicDeque<T> bag = GetBag (out hintIndex, false);
+                       bool ret = true;
                        
-                       if (bag == null || bag.PopBottom (out item) != PopResult.Succeed) {
-                               for (int i = 0; i < container.Length; i++) {
+                       if (bag == null || bag.PopBottom (out result) != PopResult.Succeed) {
+                               var self = bag;
+                               foreach (var other in staging) {
                                        // Try to retrieve something based on a hint
-                                       bool result = hintEnabled && addHints.TryDequeue (out hintIndex) && container[hintIndex].PopTop (out item) == PopResult.Succeed;
+                                       ret = TryGetHint (out hintIndex) && (bag = container[hintIndex]).PopTop (out result) == PopResult.Succeed;
 
                                        // We fall back to testing our slot
-                                       if (!result && container[i] != null)
-                                               result = container[i].PopTop (out item) == PopResult.Succeed;
+                                       if (!ret && other.Value != self) {
+                                               var status = other.Value.PopTop (out result);
+                                               while (status == PopResult.Abort)
+                                                       status = other.Value.PopTop (out result);
+                                               ret = status == PopResult.Succeed;
+                                               hintIndex = other.Key;
+                                               bag = other.Value;
+                                       }
                                        
                                        // If we found something, stop
-                                       if (result) {
-                                               Interlocked.Decrement (ref count);
-                                               return true;
-                                       }
+                                       if (ret)
+                                               break;
                                }
-                       } else {
+                       }
+
+                       if (ret) {
+                               TidyBag (hintIndex, bag);
                                Interlocked.Decrement (ref count);
-                               return true;
                        }
-                       
-                       return false;
+
+                       return ret;
+               }
+
+               public bool TryPeek (out T result)
+               {
+                       result = default (T);
+
+                       if (count == 0)
+                               return false;
+
+                       int hintIndex;
+                       CyclicDeque<T> bag = GetBag (out hintIndex, false);
+                       bool ret = true;
+
+                       if (bag == null || !bag.PeekBottom (out result)) {
+                               var self = bag;
+                               foreach (var other in staging) {
+                                       // Try to retrieve something based on a hint
+                                       ret = TryGetHint (out hintIndex) && container[hintIndex].PeekTop (out result);
+
+                                       // We fall back to testing our slot
+                                       if (!ret && other.Value != self)
+                                               ret = other.Value.PeekTop (out result);
+
+                                       // If we found something, stop
+                                       if (ret)
+                                               break;
+                               }
+                       }
+
+                       return ret;
+               }
+
+               void AddHint (int index)
+               {
+                       // We only take thread index that can be stored in 5 bits (i.e. thread ids 1-15)
+                       if (index > 0xF)
+                               return;
+                       var hs = hints;
+                       // If cas failed then we don't retry
+                       Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) << 4 | (uint)index), (long)hs);
+               }
+
+               bool TryGetHint (out int index)
+               {
+                       /* Funny little thing to know, since hints is a long (because CAS has no ulong overload),
+                        * a shift-right operation is an arithmetic shift which might set high-order right bits
+                        * to 1 instead of 0 if the number turns negative.
+                        */
+                       var hs = hints;
+                       index = 0;
+
+                       if (Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) >> 4), hs) == hs)
+                               index = (int)(hs & 0xF);
+
+                       return index > 0;
                }
                
                public int Count {
@@ -152,19 +200,16 @@ namespace System.Collections.Concurrent
                        return GetEnumeratorInternal ();
                }
                
-               IEnumerator<T> IEnumerable<T>.GetEnumerator ()
+               public IEnumerator<T> GetEnumerator ()
                {
                        return GetEnumeratorInternal ();
                }
                
                IEnumerator<T> GetEnumeratorInternal ()
                {
-                       for (int i = 0; i < size; i++) {
-                               CyclicDeque<T> bag = container[i];
-                               foreach (T item in bag.GetEnumerable ()) {
+                       foreach (var bag in container)
+                               foreach (T item in bag.Value.GetEnumerable ())
                                        yield return item;
-                               }
-                       }
                }
                
                void System.Collections.ICollection.CopyTo (Array array, int index)
@@ -206,42 +251,30 @@ namespace System.Collections.Concurrent
                        
                        return temp;
                }
-                       
+
                int GetIndex ()
                {
-                       return Thread.CurrentThread.ManagedThreadId - 1;
+                       return Thread.CurrentThread.ManagedThreadId;
                }
-               
-               void GrowIfNecessary ()
+                               
+               CyclicDeque<T> GetBag (out int index, bool createBag = true)
                {
-                       int index = GetIndex ();
-                       int currentSize = size;
-                       
-                       while (index > currentSize - 1) {
-                               currentSize = size;
-                               Grow (currentSize);
-                       }
-               }
-               
-               CyclicDeque<T> GetBag (out int i)
-               {                       
-                       i = GetIndex ();
-                       
-                       return i < container.Length ? (container[i] == null) ? (container[i] = new CyclicDeque<T> ()) : container[i] : null;
+                       index = GetIndex ();
+                       CyclicDeque<T> value;
+                       if (container.TryGetValue (index, out value))
+                               return value;
+
+                       var bag = createBag ? container.GetOrAdd (index, new CyclicDeque<T> ()) : null;
+                       if (bag != null)
+                               staging.TryAdd (index, bag);
+                       return bag;
                }
-               
-               void Grow (int referenceSize)
+
+               void TidyBag (int index, CyclicDeque<T> bag)
                {
-                       lock (syncLock) {
-                               if (referenceSize != size)
-                                       return;
-                               
-                               CyclicDeque<T>[] slice = new CyclicDeque<T>[size * multiplier];
-                               int i = 0;
-                               Array.Copy (container, slice, container.Length);
-                               
-                               container = slice;
-                               size = slice.Length;
+                       if (bag != null && bag.IsEmpty) {
+                               if (staging.TryRemove (index, out bag) && !bag.IsEmpty)
+                                       staging.TryAdd (index, bag);
                        }
                }
        }