// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-#if NET_4_0
+#if NET_4_0 || MOBILE
using System;
using System.Collections;
using System.Collections.Generic;
[DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IEnumerable<T>, IEnumerable
{
- const int multiplier = 2;
- const int hintThreshold = 20;
-
- int size = Environment.ProcessorCount + 1;
+ // We store hints in a long
+ long hints;
+
int count;
-
- // We only use the add hints when number of slot is above hintThreshold
- // so to not waste memory space and the CAS overhead
- ConcurrentQueue<int> addHints = new ConcurrentQueue<int> ();
-
- CyclicDeque<T>[] container;
-
- object syncLock = new object ();
+ // The container area is where bag are added foreach thread
+ ConcurrentDictionary<int, CyclicDeque<T>> container = new ConcurrentDictionary<int, CyclicDeque<T>> ();
+ // The staging area is where non-empty bag are located for fast iteration
+ ConcurrentDictionary<int, CyclicDeque<T>> staging = new ConcurrentDictionary<int, CyclicDeque<T>> ();
public ConcurrentBag ()
{
- container = new CyclicDeque<T>[size];
- for (int i = 0; i < container.Length; i++)
- container[i] = new CyclicDeque<T> ();
}
- public ConcurrentBag (IEnumerable<T> enumerable) : this ()
+ public ConcurrentBag (IEnumerable<T> collection) : this ()
{
- foreach (T item in enumerable)
+ foreach (T item in collection)
Add (item);
}
- public bool TryAdd (T item)
- {
- Add (item);
-
- return true;
- }
-
public void Add (T item)
{
- Interlocked.Increment (ref count);
- GrowIfNecessary ();
-
int index;
CyclicDeque<T> bag = GetBag (out index);
bag.PushBottom (item);
-
- // Cache operation ?
- if (size > hintThreshold)
- addHints.Enqueue (index);
+ AddHint (index);
+ Interlocked.Increment (ref count);
+ }
+
+ bool IProducerConsumerCollection<T>.TryAdd (T element)
+ {
+ Add (element);
+ return true;
}
- public bool TryTake (out T item)
+ public bool TryTake (out T result)
{
- item = default (T);
+ result = default (T);
if (count == 0)
return false;
int hintIndex;
- CyclicDeque<T> bag = GetBag (out hintIndex);
- bool hintEnabled = size > hintThreshold;
+ CyclicDeque<T> bag = GetBag (out hintIndex, false);
+ bool ret = true;
- if (bag == null || bag.PopBottom (out item) != PopResult.Succeed) {
- for (int i = 0; i < container.Length; i++) {
+ if (bag == null || bag.PopBottom (out result) != PopResult.Succeed) {
+ var self = bag;
+ foreach (var other in staging) {
// Try to retrieve something based on a hint
- bool result = hintEnabled && addHints.TryDequeue (out hintIndex) && container[hintIndex].PopTop (out item) == PopResult.Succeed;
+ ret = TryGetHint (out hintIndex) && (bag = container[hintIndex]).PopTop (out result) == PopResult.Succeed;
// We fall back to testing our slot
- if (!result && container[i] != null)
- result = container[i].PopTop (out item) == PopResult.Succeed;
+ if (!ret && other.Value != self) {
+ var status = other.Value.PopTop (out result);
+ while (status == PopResult.Abort)
+ status = other.Value.PopTop (out result);
+ ret = status == PopResult.Succeed;
+ hintIndex = other.Key;
+ bag = other.Value;
+ }
// If we found something, stop
- if (result) {
- Interlocked.Decrement (ref count);
- return true;
- }
+ if (ret)
+ break;
}
- } else {
+ }
+
+ if (ret) {
+ TidyBag (hintIndex, bag);
Interlocked.Decrement (ref count);
- return true;
}
-
- return false;
+
+ return ret;
+ }
+
+ public bool TryPeek (out T result)
+ {
+ result = default (T);
+
+ if (count == 0)
+ return false;
+
+ int hintIndex;
+ CyclicDeque<T> bag = GetBag (out hintIndex, false);
+ bool ret = true;
+
+ if (bag == null || !bag.PeekBottom (out result)) {
+ var self = bag;
+ foreach (var other in staging) {
+ // Try to retrieve something based on a hint
+ ret = TryGetHint (out hintIndex) && container[hintIndex].PeekTop (out result);
+
+ // We fall back to testing our slot
+ if (!ret && other.Value != self)
+ ret = other.Value.PeekTop (out result);
+
+ // If we found something, stop
+ if (ret)
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ void AddHint (int index)
+ {
+ // We only take thread index that can be stored in 5 bits (i.e. thread ids 1-15)
+ if (index > 0xF)
+ return;
+ var hs = hints;
+ // If cas failed then we don't retry
+ Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) << 4 | (uint)index), (long)hs);
+ }
+
+ bool TryGetHint (out int index)
+ {
+ /* Funny little thing to know, since hints is a long (because CAS has no ulong overload),
+ * a shift-right operation is an arithmetic shift which might set high-order right bits
+ * to 1 instead of 0 if the number turns negative.
+ */
+ var hs = hints;
+ index = 0;
+
+ if (Interlocked.CompareExchange (ref hints, (long)(((ulong)hs) >> 4), hs) == hs)
+ index = (int)(hs & 0xF);
+
+ return index > 0;
}
public int Count {
return GetEnumeratorInternal ();
}
- IEnumerator<T> IEnumerable<T>.GetEnumerator ()
+ public IEnumerator<T> GetEnumerator ()
{
return GetEnumeratorInternal ();
}
IEnumerator<T> GetEnumeratorInternal ()
{
- for (int i = 0; i < size; i++) {
- CyclicDeque<T> bag = container[i];
- foreach (T item in bag.GetEnumerable ()) {
+ foreach (var bag in container)
+ foreach (T item in bag.Value.GetEnumerable ())
yield return item;
- }
- }
}
void System.Collections.ICollection.CopyTo (Array array, int index)
return temp;
}
-
+
int GetIndex ()
{
- return Thread.CurrentThread.ManagedThreadId - 1;
+ return Thread.CurrentThread.ManagedThreadId;
}
-
- void GrowIfNecessary ()
+
+ CyclicDeque<T> GetBag (out int index, bool createBag = true)
{
- int index = GetIndex ();
- int currentSize = size;
-
- while (index > currentSize - 1) {
- currentSize = size;
- Grow (currentSize);
- }
- }
-
- CyclicDeque<T> GetBag (out int i)
- {
- i = GetIndex ();
-
- return i < container.Length ? (container[i] == null) ? (container[i] = new CyclicDeque<T> ()) : container[i] : null;
+ index = GetIndex ();
+ CyclicDeque<T> value;
+ if (container.TryGetValue (index, out value))
+ return value;
+
+ var bag = createBag ? container.GetOrAdd (index, new CyclicDeque<T> ()) : null;
+ if (bag != null)
+ staging.TryAdd (index, bag);
+ return bag;
}
-
- void Grow (int referenceSize)
+
+ void TidyBag (int index, CyclicDeque<T> bag)
{
- lock (syncLock) {
- if (referenceSize != size)
- return;
-
- CyclicDeque<T>[] slice = new CyclicDeque<T>[size * multiplier];
- int i = 0;
- Array.Copy (container, slice, container.Length);
-
- container = slice;
- size = slice.Length;
+ if (bag != null && bag.IsEmpty) {
+ if (staging.TryRemove (index, out bag) && !bag.IsEmpty)
+ staging.TryAdd (index, bag);
}
}
}