2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #if NET_4_0 || BOOTSTRAP_NET_4_0
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 const int spinCount = 5;
44 readonly IProducerConsumerCollection<T> underlyingColl;
45 readonly int upperBound;
47 AtomicBoolean isComplete;
50 /* The whole idea of the collection is to use these two long values in a transactional
51 * to track and manage the actual data inside the underlying lock-free collection
52 * instead of directly working with it or using external locking.
54 * They are manipulated with CAS and are guaranteed to increase over time and use
55 * of the instance thus preventing ABA problems.
57 long addId = long.MinValue;
58 long removeId = long.MinValue;
60 /* These events are used solely for the purpose of having an optimized sleep cycle when
61 * the BlockingCollection have to wait on an external event (Add or Remove for instance)
63 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
67 public BlockingCollection ()
68 : this (new ConcurrentQueue<T> (), -1)
72 public BlockingCollection (int upperBound)
73 : this (new ConcurrentQueue<T> (), upperBound)
77 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
78 : this (underlyingColl, -1)
82 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
84 this.underlyingColl = underlyingColl;
85 this.upperBound = upperBound;
86 this.isComplete = new AtomicBoolean ();
90 #region Add & Remove (+ Try)
91 public void Add (T item)
93 Add (item, CancellationToken.None);
96 public void Add (T item, CancellationToken token)
98 SpinWait sw = new SpinWait ();
102 token.ThrowIfCancellationRequested ();
105 long cachedRemoveId = removeId;
107 if (upperBound != -1) {
108 if (cachedAddId - cachedRemoveId > upperBound) {
109 if (sw.Count <= spinCount) {
114 if (cachedRemoveId != removeId)
117 mreRemove.Wait (token);
125 // Check our transaction id against completed stored one
126 if (isComplete.Value && cachedAddId >= completeId)
127 ThrowCompleteException ();
128 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
132 if (isComplete.Value && cachedAddId >= completeId)
133 ThrowCompleteException ();
135 while (!underlyingColl.TryAdd (item));
143 return Take (CancellationToken.None);
146 public T Take (CancellationToken token)
148 SpinWait sw = new SpinWait ();
151 token.ThrowIfCancellationRequested ();
153 long cachedRemoveId = removeId;
154 long cachedAddId = addId;
157 if (cachedRemoveId == cachedAddId) {
159 ThrowCompleteException ();
161 if (sw.Count <= spinCount) {
164 if (cachedAddId != addId)
167 ThrowCompleteException ();
176 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
181 while (!underlyingColl.TryTake (out item));
183 if (!mreRemove.IsSet)
189 public bool TryAdd (T item)
191 return TryAdd (item, () => false, CancellationToken.None);
194 bool TryAdd (T item, Func<bool> contFunc, CancellationToken token)
197 token.ThrowIfCancellationRequested ();
199 long cachedAddId = addId;
200 long cachedRemoveId = removeId;
202 if (upperBound != -1)
203 if (cachedAddId - cachedRemoveId > upperBound)
206 // Check our transaction id against completed stored one
207 if (isComplete.Value && cachedAddId >= completeId)
208 throw new InvalidOperationException ("The BlockingCollection<T> has"
209 + " been marked as complete with regards to additions.");
211 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
214 while (!underlyingColl.TryAdd (item));
220 } while (contFunc ());
225 public bool TryAdd (T item, TimeSpan ts)
227 return TryAdd (item, (int)ts.TotalMilliseconds);
230 public bool TryAdd (T item, int millisecondsTimeout)
232 Stopwatch sw = Stopwatch.StartNew ();
233 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
236 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
238 Stopwatch sw = Stopwatch.StartNew ();
239 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
242 public bool TryTake (out T item)
244 return TryTake (out item, () => false, CancellationToken.None);
247 bool TryTake (out T item, Func<bool> contFunc, CancellationToken token)
252 token.ThrowIfCancellationRequested ();
254 long cachedRemoveId = removeId;
255 long cachedAddId = addId;
258 if (cachedRemoveId == cachedAddId) {
265 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
268 while (!underlyingColl.TryTake (out item));
270 if (!mreRemove.IsSet)
273 } while (contFunc ());
278 public bool TryTake (out T item, TimeSpan ts)
280 return TryTake (out item, (int)ts.TotalMilliseconds);
283 public bool TryTake (out T item, int millisecondsTimeout)
286 Stopwatch sw = Stopwatch.StartNew ();
288 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
291 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
294 Stopwatch sw = Stopwatch.StartNew ();
296 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
300 #region static methods
301 static void CheckArray (BlockingCollection<T>[] collections)
303 if (collections == null)
304 throw new ArgumentNullException ("collections");
305 if (collections.Length == 0 || IsThereANullElement (collections))
306 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
309 static bool IsThereANullElement (BlockingCollection<T>[] collections)
311 foreach (BlockingCollection<T> e in collections)
317 public static int AddToAny (BlockingCollection<T>[] collections, T item)
319 CheckArray (collections);
321 foreach (var coll in collections) {
331 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
333 CheckArray (collections);
335 foreach (var coll in collections) {
337 coll.Add (item, token);
345 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
347 CheckArray (collections);
349 foreach (var coll in collections) {
350 if (coll.TryAdd (item))
357 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
359 CheckArray (collections);
361 foreach (var coll in collections) {
362 if (coll.TryAdd (item, ts))
369 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
371 CheckArray (collections);
373 foreach (var coll in collections) {
374 if (coll.TryAdd (item, millisecondsTimeout))
381 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
382 CancellationToken token)
384 CheckArray (collections);
386 foreach (var coll in collections) {
387 if (coll.TryAdd (item, millisecondsTimeout, token))
394 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
397 CheckArray (collections);
399 foreach (var coll in collections) {
409 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
412 CheckArray (collections);
414 foreach (var coll in collections) {
416 item = coll.Take (token);
424 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
428 CheckArray (collections);
430 foreach (var coll in collections) {
431 if (coll.TryTake (out item))
438 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
442 CheckArray (collections);
444 foreach (var coll in collections) {
445 if (coll.TryTake (out item, ts))
452 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
456 CheckArray (collections);
458 foreach (var coll in collections) {
459 if (coll.TryTake (out item, millisecondsTimeout))
466 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
467 CancellationToken token)
471 CheckArray (collections);
473 foreach (var coll in collections) {
474 if (coll.TryTake (out item, millisecondsTimeout, token))
482 public void CompleteAdding ()
484 // No further add beside that point
486 isComplete.Value = true;
487 // Wakeup some operation in case this has an impact
492 void ThrowCompleteException ()
494 throw new InvalidOperationException ("The BlockingCollection<T> has"
495 + " been marked as complete with regards to additions.");
498 void ICollection.CopyTo (Array array, int index)
500 underlyingColl.CopyTo (array, index);
503 public void CopyTo (T[] array, int index)
505 underlyingColl.CopyTo (array, index);
508 public IEnumerable<T> GetConsumingEnumerable ()
510 return GetConsumingEnumerable (Take);
513 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
515 return GetConsumingEnumerable (() => Take (token));
518 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
521 T item = default (T);
533 IEnumerator IEnumerable.GetEnumerator ()
535 return ((IEnumerable)underlyingColl).GetEnumerator ();
538 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
540 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
543 public void Dispose ()
548 protected virtual void Dispose (bool managedRes)
553 public T[] ToArray ()
555 return underlyingColl.ToArray ();
558 public int BoundedCapacity {
566 return underlyingColl.Count;
570 public bool IsAddingCompleted {
572 return isComplete.Value;
576 public bool IsCompleted {
578 return isComplete.Value && addId == removeId;
582 object ICollection.SyncRoot {
584 return underlyingColl.SyncRoot;
588 bool ICollection.IsSynchronized {
590 return underlyingColl.IsSynchronized;