2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #if NET_4_0 || BOOTSTRAP_NET_4_0
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 const int spinCount = 5;
44 readonly IProducerConsumerCollection<T> underlyingColl;
45 readonly int upperBound;
47 AtomicBoolean isComplete;
50 /* The whole idea of the collection is to use these two long values in a transactional
51 * to track and manage the actual data inside the underlying lock-free collection
52 * instead of directly working with it or using external locking.
54 * They are manipulated with CAS and are guaranteed to increase over time and use
55 * of the instance thus preventing ABA problems.
57 long addId = long.MinValue;
58 long removeId = long.MinValue;
60 /* These events are used solely for the purpose of having an optimized sleep cycle when
61 * the BlockingCollection have to wait on an external event (Add or Remove for instance)
63 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
67 public BlockingCollection ()
68 : this (new ConcurrentQueue<T> (), -1)
72 public BlockingCollection (int upperBound)
73 : this (new ConcurrentQueue<T> (), upperBound)
77 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
78 : this (underlyingColl, -1)
82 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
84 this.underlyingColl = underlyingColl;
85 this.upperBound = upperBound;
86 this.isComplete = new AtomicBoolean ();
90 #region Add & Remove (+ Try)
91 public void Add (T item)
93 Add (item, CancellationToken.None);
96 public void Add (T item, CancellationToken token)
98 SpinWait sw = new SpinWait ();
102 token.ThrowIfCancellationRequested ();
105 long cachedRemoveId = removeId;
107 if (upperBound != -1) {
108 if (cachedAddId - cachedRemoveId > upperBound) {
109 if (sw.Count <= spinCount) {
114 if (cachedRemoveId != removeId)
117 mreRemove.Wait (token);
125 // Check our transaction id against completed stored one
126 if (isComplete.Value && cachedAddId >= completeId)
127 ThrowCompleteException ();
128 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
132 if (isComplete.Value && cachedAddId >= completeId)
133 ThrowCompleteException ();
135 while (!underlyingColl.TryAdd (item));
143 return Take (CancellationToken.None);
146 public T Take (CancellationToken token)
148 SpinWait sw = new SpinWait ();
151 token.ThrowIfCancellationRequested ();
153 long cachedRemoveId = removeId;
154 long cachedAddId = addId;
157 if (cachedRemoveId == cachedAddId) {
159 ThrowCompleteException ();
161 if (sw.Count <= spinCount) {
164 if (cachedAddId != addId)
167 ThrowCompleteException ();
176 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
181 while (!underlyingColl.TryTake (out item));
183 if (!mreRemove.IsSet)
189 public bool TryAdd (T item)
191 return TryAdd (item, () => false, CancellationToken.None);
194 bool TryAdd (T item, Func<bool> contFunc, CancellationToken token)
197 token.ThrowIfCancellationRequested ();
199 long cachedAddId = addId;
200 long cachedRemoveId = removeId;
202 if (upperBound != -1)
203 if (cachedAddId - cachedRemoveId > upperBound)
206 // Check our transaction id against completed stored one
207 if (isComplete.Value && cachedAddId >= completeId)
208 throw new InvalidOperationException ("The BlockingCollection<T> has"
209 + " been marked as complete with regards to additions.");
211 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
214 while (!underlyingColl.TryAdd (item));
220 } while (contFunc ());
225 public bool TryAdd (T item, TimeSpan ts)
227 return TryAdd (item, (int)ts.TotalMilliseconds);
230 public bool TryAdd (T item, int millisecondsTimeout)
232 Stopwatch sw = Stopwatch.StartNew ();
233 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
236 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
238 Stopwatch sw = Stopwatch.StartNew ();
239 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
242 public bool TryTake (out T item)
244 return TryTake (out item, () => false, CancellationToken.None);
247 bool TryTake (out T item, Func<bool> contFunc, CancellationToken token)
252 token.ThrowIfCancellationRequested ();
254 long cachedRemoveId = removeId;
255 long cachedAddId = addId;
258 if (cachedRemoveId == cachedAddId) {
265 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
268 while (!underlyingColl.TryTake (out item));
270 if (!mreRemove.IsSet)
272 } while (contFunc ());
277 public bool TryTake (out T item, TimeSpan ts)
279 return TryTake (out item, (int)ts.TotalMilliseconds);
282 public bool TryTake (out T item, int millisecondsTimeout)
285 Stopwatch sw = Stopwatch.StartNew ();
287 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
290 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
293 Stopwatch sw = Stopwatch.StartNew ();
295 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
299 #region static methods
300 static void CheckArray (BlockingCollection<T>[] collections)
302 if (collections == null)
303 throw new ArgumentNullException ("collections");
304 if (collections.Length == 0 || IsThereANullElement (collections))
305 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
308 static bool IsThereANullElement (BlockingCollection<T>[] collections)
310 foreach (BlockingCollection<T> e in collections)
316 public static int AddToAny (BlockingCollection<T>[] collections, T item)
318 CheckArray (collections);
320 foreach (var coll in collections) {
330 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
332 CheckArray (collections);
334 foreach (var coll in collections) {
336 coll.Add (item, token);
344 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
346 CheckArray (collections);
348 foreach (var coll in collections) {
349 if (coll.TryAdd (item))
356 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
358 CheckArray (collections);
360 foreach (var coll in collections) {
361 if (coll.TryAdd (item, ts))
368 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
370 CheckArray (collections);
372 foreach (var coll in collections) {
373 if (coll.TryAdd (item, millisecondsTimeout))
380 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
381 CancellationToken token)
383 CheckArray (collections);
385 foreach (var coll in collections) {
386 if (coll.TryAdd (item, millisecondsTimeout, token))
393 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
396 CheckArray (collections);
398 foreach (var coll in collections) {
408 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
411 CheckArray (collections);
413 foreach (var coll in collections) {
415 item = coll.Take (token);
423 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
427 CheckArray (collections);
429 foreach (var coll in collections) {
430 if (coll.TryTake (out item))
437 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
441 CheckArray (collections);
443 foreach (var coll in collections) {
444 if (coll.TryTake (out item, ts))
451 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
455 CheckArray (collections);
457 foreach (var coll in collections) {
458 if (coll.TryTake (out item, millisecondsTimeout))
465 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
466 CancellationToken token)
470 CheckArray (collections);
472 foreach (var coll in collections) {
473 if (coll.TryTake (out item, millisecondsTimeout, token))
481 public void CompleteAdding ()
483 // No further add beside that point
485 isComplete.Value = true;
486 // Wakeup some operation in case this has an impact
491 void ThrowCompleteException ()
493 throw new InvalidOperationException ("The BlockingCollection<T> has"
494 + " been marked as complete with regards to additions.");
497 void ICollection.CopyTo (Array array, int index)
499 underlyingColl.CopyTo (array, index);
502 public void CopyTo (T[] array, int index)
504 underlyingColl.CopyTo (array, index);
507 public IEnumerable<T> GetConsumingEnumerable ()
509 return GetConsumingEnumerable (Take);
512 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
514 return GetConsumingEnumerable (() => Take (token));
517 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
520 T item = default (T);
532 IEnumerator IEnumerable.GetEnumerator ()
534 return ((IEnumerable)underlyingColl).GetEnumerator ();
537 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
539 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
542 public void Dispose ()
547 protected virtual void Dispose (bool managedRes)
552 public T[] ToArray ()
554 return underlyingColl.ToArray ();
557 public int BoundedCapacity {
565 return underlyingColl.Count;
569 public bool IsAddingCompleted {
571 return isComplete.Value;
575 public bool IsCompleted {
577 return isComplete.Value && addId == removeId;
581 object ICollection.SyncRoot {
583 return underlyingColl.SyncRoot;
587 bool ICollection.IsSynchronized {
589 return underlyingColl.IsSynchronized;