//
//
-#if NET_4_0
+#if NET_4_0 || BOOTSTRAP_NET_4_0
using System;
using System.Threading;
{
readonly IProducerConsumerCollection<T> underlyingColl;
readonly int upperBound;
-
- readonly SpinWait sw = new SpinWait ();
-
+
AtomicBoolean isComplete;
long completeId;
long addId = long.MinValue;
long removeId = long.MinValue;
-
+
#region ctors
public BlockingCollection ()
: this (new ConcurrentQueue<T> (), -1)
{
}
-
+
public BlockingCollection (int upperBound)
: this (new ConcurrentQueue<T> (), upperBound)
{
}
-
+
public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
: this (underlyingColl, -1)
{
}
-
+
public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
{
this.underlyingColl = underlyingColl;
this.isComplete = new AtomicBoolean ();
}
#endregion
-
+
#region Add & Remove (+ Try)
public void Add (T item)
{
Add (item, null);
}
-
+
public void Add (T item, CancellationToken token)
{
Add (item, () => token.IsCancellationRequested);
}
-
+
void Add (T item, Func<bool> cancellationFunc)
{
while (true) {
long cachedAddId = addId;
long cachedRemoveId = removeId;
-
+
if (upperBound != -1) {
if (cachedAddId - cachedRemoveId > upperBound) {
Block ();
continue;
}
}
-
+
// Check our transaction id against completed stored one
if (isComplete.Value && cachedAddId >= completeId)
throw new InvalidOperationException ("The BlockingCollection<T> has"
+ " been marked as complete with regards to additions.");
-
+
if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
break;
-
+
if (cancellationFunc != null && cancellationFunc ())
throw new OperationCanceledException ("CancellationToken triggered");
}
-
-
+
+
if (!underlyingColl.TryAdd (item))
throw new InvalidOperationException ("The underlying collection didn't accept the item.");
}
-
+
public T Take ()
{
return Take (null);
}
-
+
public T Take (CancellationToken token)
{
return Take (() => token.IsCancellationRequested);
}
-
+
T Take (Func<bool> cancellationFunc)
{
while (true) {
long cachedRemoveId = removeId;
long cachedAddId = addId;
-
+
// Empty case
if (cachedRemoveId == cachedAddId) {
- if (isComplete.Value && cachedRemoveId >= completeId)
+ if (IsCompleted)
throw new OperationCanceledException ("The BlockingCollection<T> has"
+ " been marked as complete with regards to additions.");
-
+
Block ();
continue;
}
-
+
if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
break;
-
+
if (cancellationFunc != null && cancellationFunc ())
throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
}
-
+
T item;
while (!underlyingColl.TryTake (out item));
-
+
return item;
}
-
+
public bool TryAdd (T item)
{
return TryAdd (item, null, null);
}
-
+
bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
{
do {
if (token.HasValue && token.Value.IsCancellationRequested)
throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
-
+
long cachedAddId = addId;
long cachedRemoveId = removeId;
-
+
if (upperBound != -1) {
if (cachedAddId - cachedRemoveId > upperBound) {
continue;
}
}
-
+
// Check our transaction id against completed stored one
if (isComplete.Value && cachedAddId >= completeId)
throw new InvalidOperationException ("The BlockingCollection<T> has"
+ " been marked as complete with regards to additions.");
-
+
if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
continue;
-
+
if (!underlyingColl.TryAdd (item))
throw new InvalidOperationException ("The underlying collection didn't accept the item.");
-
+
return true;
} while (contFunc != null && contFunc ());
-
+
return false;
}
-
+
public bool TryAdd (T item, TimeSpan ts)
{
return TryAdd (item, (int)ts.TotalMilliseconds);
}
-
+
public bool TryAdd (T item, int millisecondsTimeout)
{
Stopwatch sw = Stopwatch.StartNew ();
return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
}
-
+
public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
{
Stopwatch sw = Stopwatch.StartNew ();
return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
}
-
+
public bool TryTake (out T item)
{
return TryTake (out item, null, null);
}
-
+
bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
{
item = default (T);
-
+
do {
if (token.HasValue && token.Value.IsCancellationRequested)
throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
-
+
long cachedRemoveId = removeId;
long cachedAddId = addId;
-
+
// Empty case
if (cachedRemoveId == cachedAddId) {
- if (isComplete.Value && cachedRemoveId >= completeId)
- continue;
-
+ if (IsCompleted)
+ return false;
+
continue;
}
-
+
if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
continue;
-
+
return underlyingColl.TryTake (out item);
} while (contFunc != null && contFunc ());
-
+
return false;
}
-
+
public bool TryTake (out T item, TimeSpan ts)
{
return TryTake (out item, (int)ts.TotalMilliseconds);
}
-
+
public bool TryTake (out T item, int millisecondsTimeout)
{
item = default (T);
Stopwatch sw = Stopwatch.StartNew ();
-
+
return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
}
-
+
public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
{
item = default (T);
Stopwatch sw = Stopwatch.StartNew ();
-
+
return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
}
#endregion
-
+
#region static methods
static void CheckArray (BlockingCollection<T>[] collections)
{
if (collections.Length == 0 || IsThereANullElement (collections))
throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
}
-
+
static bool IsThereANullElement (BlockingCollection<T>[] collections)
{
foreach (BlockingCollection<T> e in collections)
return true;
return false;
}
-
+
public static int AddToAny (BlockingCollection<T>[] collections, T item)
{
CheckArray (collections);
}
return -1;
}
-
+
public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
{
CheckArray (collections);
}
return -1;
}
-
+
public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
{
CheckArray (collections);
}
return -1;
}
-
+
public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
{
CheckArray (collections);
}
return -1;
}
-
+
public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
{
CheckArray (collections);
}
return -1;
}
-
+
public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
CancellationToken token)
{
}
return -1;
}
-
+
public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
{
item = default (T);
}
return -1;
}
-
+
public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
{
item = default (T);
}
return -1;
}
-
+
public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
{
item = default (T);
-
+
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
}
return -1;
}
-
+
public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
{
item = default (T);
-
+
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
}
return -1;
}
-
+
public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
{
item = default (T);
-
+
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
}
return -1;
}
-
+
public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
CancellationToken token)
{
item = default (T);
-
+
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
return -1;
}
#endregion
-
+
public void CompleteAdding ()
{
// No further add beside that point
completeId = addId;
isComplete.Value = true;
}
-
+
void ICollection.CopyTo (Array array, int index)
{
underlyingColl.CopyTo (array, index);
}
-
+
public void CopyTo (T[] array, int index)
{
underlyingColl.CopyTo (array, index);
}
-
+
public IEnumerable<T> GetConsumingEnumerable ()
{
return GetConsumingEnumerable (Take);
}
-
+
public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
{
return GetConsumingEnumerable (() => Take (token));
}
-
+
IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
{
while (true) {
T item = default (T);
-
+
try {
item = getFunc ();
} catch {
break;
}
-
+
yield return item;
}
}
-
+
IEnumerator IEnumerable.GetEnumerator ()
{
return ((IEnumerable)underlyingColl).GetEnumerator ();
}
-
+
IEnumerator<T> IEnumerable<T>.GetEnumerator ()
{
return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
}
-
+
public void Dispose ()
{
-
+
}
-
+
protected virtual void Dispose (bool managedRes)
{
-
+
}
-
+
public T[] ToArray ()
{
return underlyingColl.ToArray ();
}
+ [ThreadStatic]
+ SpinWait sw;
+
// Method used to stall the thread for a limited period of time before retrying an operation
void Block ()
{
sw.SpinOnce ();
}
-
+
public int BoundedCapacity {
get {
return upperBound;
}
}
-
+
public int Count {
get {
return underlyingColl.Count;
}
}
-
+
public bool IsAddingCompleted {
get {
return isComplete.Value;
}
}
-
+
public bool IsCompleted {
get {
return isComplete.Value && addId == removeId;
}
}
-
+
object ICollection.SyncRoot {
get {
return underlyingColl.SyncRoot;
}
}
-
+
bool ICollection.IsSynchronized {
get {
return underlyingColl.IsSynchronized;
-#if NET_4_0
-#define USE_MONITOR
//
// EnumerablePartitioner.cs
//
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
+#if NET_4_0 || BOOTSTRAP_NET_4_0
+
using System;
+using System.Threading.Tasks;
using System.Collections.Generic;
namespace System.Collections.Concurrent
{
+ // Represent a chunk partitioner
internal class EnumerablePartitioner<T> : OrderablePartitioner<T>
{
IEnumerable<T> source;
-#if USE_MONITOR
- object syncLock = new object ();
-#endif
+
const int InitialPartitionSize = 1;
const int PartitionMultiplier = 2;
+ int initialPartitionSize;
+ int partitionMultiplier;
+
int index = 0;
+ readonly object syncLock = new object ();
- public EnumerablePartitioner (IEnumerable<T> source) : base (true, false, true)
+ public EnumerablePartitioner (IEnumerable<T> source)
+ : this (source, InitialPartitionSize, PartitionMultiplier)
+ {
+
+ }
+
+ // This is used to get striped partitionning (for Take and Skip for instance
+ public EnumerablePartitioner (IEnumerable<T> source, int initialPartitionSize, int partitionMultiplier)
+ : base (true, false, true)
{
this.source = source;
+ this.initialPartitionSize = initialPartitionSize;
+ this.partitionMultiplier = partitionMultiplier;
}
public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions (int partitionCount)
IEnumerator<KeyValuePair<long, T>> GetPartitionEnumerator (IEnumerator<T> src)
{
- int count = InitialPartitionSize;
+ int count = initialPartitionSize;
List<T> list = new List<T> ();
while (true) {
}
}
+
+
for (int i = 0; i < list.Count; i++)
yield return new KeyValuePair<long, T> (ind + i, list[i]);
- count *= PartitionMultiplier;
+ count *= partitionMultiplier;
}
}
}
}
-#endif
\ No newline at end of file
+#endif
-#if NET_4_0
//
// ListPartitioner.cs
//
using System;
using System.Collections.Generic;
+#if NET_4_0 || BOOTSTRAP_NET_4_0
+
namespace System.Collections.Concurrent
{
+ // Represent a Range partitioner
internal class ListPartitioner<T> : OrderablePartitioner<T>
{
IList<T> source;
+ readonly bool chunking = Environment.GetEnvironmentVariable ("PLINQ_PARTITIONING_HINT") == "chunking";
public ListPartitioner (IList<T> source) : base (true, true, true)
{
IEnumerator<KeyValuePair<long, T>>[] enumerators
= new IEnumerator<KeyValuePair<long, T>>[partitionCount];
- int count = (source.Count >= partitionCount) ? source.Count / partitionCount : 1;
+ int count = GetBestCacheLineSize (source.Count, partitionCount);
for (int i = 0; i < enumerators.Length; i++) {
+ if (chunking) {
+ const int step = 64;
+ enumerators[i] = GetEnumeratorForRange (i * step, enumerators.Length, source.Count, step);
+ continue;
+ }
+
if (i != enumerators.Length - 1)
enumerators[i] = GetEnumeratorForRange (i * count, i * count + count);
else
return enumerators;
}
+ int GetBestCacheLineSize (int initialSize, int partitionCount)
+ {
+ const int numPartition = 20;
+ uint size = (uint)(initialSize / partitionCount / numPartition);
+ int count = 0;
+
+ if (size <= 1)
+ return 1;
+
+ while ((size <<= 1) < 0x80000040)
+ ++count;
+ return (int)(0x80000040 >> (count + 1));
+ }
+
IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int lastIndex)
{
if (startIndex >= source.Count)
return GetEnumeratorForRangeInternal (startIndex, lastIndex);
}
+
+ IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int stride, int count, int step)
+ {
+ if (startIndex >= source.Count)
+ return GetEmpty ();
+
+ return GetEnumeratorForRangeInternal (startIndex, stride, count, step);
+ }
IEnumerator<KeyValuePair<long, T>> GetEmpty ()
- {
+ {
yield break;
- }
+ }
IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int lastIndex)
{
yield return new KeyValuePair<long, T> (i, source[i]);
}
}
+
+ IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int stride, int count, int step)
+ {
+ for (int i = startIndex; i < count; i += stride * step) {
+ for (int j = i; j < i + step && j < count; j++) {
+ yield return new KeyValuePair<long, T> (j, source[j]);
+ }
+ }
+ }
}
}
-#endif
\ No newline at end of file
+#endif