2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 const int spinCount = 5;
44 readonly IProducerConsumerCollection<T> underlyingColl;
45 readonly int upperBound;
47 AtomicBoolean isComplete;
50 /* The whole idea of the collection is to use these two long values in a transactional
51 * way to track and manage the actual data inside the underlying lock-free collection
52 * instead of directly working with it or using external locking.
54 * They are manipulated with CAS and are guaranteed to increase over time and use
55 * of the instance thus preventing ABA problems.
57 long addId = long.MinValue;
58 long removeId = long.MinValue;
60 /* These events are used solely for the purpose of having an optimized sleep cycle when
61 * the BlockingCollection have to wait on an external event (Add or Remove for instance)
63 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
66 /* For time based operations, we share this instance of Stopwatch and base calculation
67 on a time offset at each of these method call */
68 static Stopwatch watch = Stopwatch.StartNew ();
71 public BlockingCollection ()
72 : this (new ConcurrentQueue<T> (), -1)
76 public BlockingCollection (int boundedCapacity)
77 : this (new ConcurrentQueue<T> (), boundedCapacity)
81 public BlockingCollection (IProducerConsumerCollection<T> collection)
82 : this (collection, -1)
86 public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
88 this.underlyingColl = collection;
89 this.upperBound = boundedCapacity;
90 this.isComplete = new AtomicBoolean ();
94 #region Add & Remove (+ Try)
95 public void Add (T item)
97 Add (item, CancellationToken.None);
100 public void Add (T item, CancellationToken cancellationToken)
102 TryAdd (item, -1, cancellationToken);
105 public bool TryAdd (T item)
107 return TryAdd (item, 0, CancellationToken.None);
110 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
112 if (millisecondsTimeout < -1)
113 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
115 long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
116 SpinWait sw = new SpinWait ();
119 cancellationToken.ThrowIfCancellationRequested ();
121 long cachedAddId = addId;
122 long cachedRemoveId = removeId;
124 // If needed, we check and wait that the collection isn't full
125 if (upperBound != -1 && cachedAddId - cachedRemoveId > upperBound) {
126 if (millisecondsTimeout == 0)
129 if (sw.Count <= spinCount) {
133 if (cachedRemoveId != removeId || cachedAddId != addId) {
138 mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
144 // Check our transaction id against completed stored one
145 if (isComplete.Value && cachedAddId >= completeId)
146 ThrowCompleteException ();
148 // Validate the steps we have been doing until now
149 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
152 // We have a slot reserved in the underlying collection, try to take it
153 if (!underlyingColl.TryAdd (item))
154 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
156 // Wake up process that may have been sleeping
160 } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
165 public bool TryAdd (T item, TimeSpan timeout)
167 return TryAdd (item, (int)timeout.TotalMilliseconds);
170 public bool TryAdd (T item, int millisecondsTimeout)
172 return TryAdd (item, millisecondsTimeout, CancellationToken.None);
177 return Take (CancellationToken.None);
180 public T Take (CancellationToken cancellationToken)
183 TryTake (out item, -1, cancellationToken, true);
188 public bool TryTake (out T item)
190 return TryTake (out item, 0, CancellationToken.None);
193 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
195 return TryTake (out item, millisecondsTimeout, cancellationToken, false);
198 bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
200 if (milliseconds < -1)
201 throw new ArgumentOutOfRangeException ("milliseconds");
204 SpinWait sw = new SpinWait ();
205 long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
208 cancellationToken.ThrowIfCancellationRequested ();
210 long cachedRemoveId = removeId;
211 long cachedAddId = addId;
214 if (cachedRemoveId == cachedAddId) {
215 if (milliseconds == 0)
220 ThrowCompleteException ();
225 if (sw.Count <= spinCount) {
229 if (cachedRemoveId != removeId || cachedAddId != addId) {
234 mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
240 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
243 while (!underlyingColl.TryTake (out item));
249 } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
254 public bool TryTake (out T item, TimeSpan timeout)
256 return TryTake (out item, (int)timeout.TotalMilliseconds);
259 public bool TryTake (out T item, int millisecondsTimeout)
263 return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
266 static int ComputeTimeout (int millisecondsTimeout, long start)
268 return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
272 #region static methods
273 static void CheckArray (BlockingCollection<T>[] collections)
275 if (collections == null)
276 throw new ArgumentNullException ("collections");
277 if (collections.Length == 0 || IsThereANullElement (collections))
278 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
281 static bool IsThereANullElement (BlockingCollection<T>[] collections)
283 foreach (BlockingCollection<T> e in collections)
289 public static int AddToAny (BlockingCollection<T>[] collections, T item)
291 CheckArray (collections);
293 foreach (var coll in collections) {
303 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
305 CheckArray (collections);
307 foreach (var coll in collections) {
309 coll.Add (item, cancellationToken);
317 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
319 CheckArray (collections);
321 foreach (var coll in collections) {
322 if (coll.TryAdd (item))
329 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
331 CheckArray (collections);
333 foreach (var coll in collections) {
334 if (coll.TryAdd (item, timeout))
341 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
343 CheckArray (collections);
345 foreach (var coll in collections) {
346 if (coll.TryAdd (item, millisecondsTimeout))
353 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
354 CancellationToken cancellationToken)
356 CheckArray (collections);
358 foreach (var coll in collections) {
359 if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
366 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
369 CheckArray (collections);
371 foreach (var coll in collections) {
381 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
384 CheckArray (collections);
386 foreach (var coll in collections) {
388 item = coll.Take (cancellationToken);
396 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
400 CheckArray (collections);
402 foreach (var coll in collections) {
403 if (coll.TryTake (out item))
410 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
414 CheckArray (collections);
416 foreach (var coll in collections) {
417 if (coll.TryTake (out item, timeout))
424 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
428 CheckArray (collections);
430 foreach (var coll in collections) {
431 if (coll.TryTake (out item, millisecondsTimeout))
438 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
439 CancellationToken cancellationToken)
443 CheckArray (collections);
445 foreach (var coll in collections) {
446 if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
454 public void CompleteAdding ()
456 // No further add beside that point
458 isComplete.Value = true;
459 // Wakeup some operation in case this has an impact
464 void ThrowCompleteException ()
466 throw new InvalidOperationException ("The BlockingCollection<T> has"
467 + " been marked as complete with regards to additions.");
470 void ICollection.CopyTo (Array array, int index)
472 underlyingColl.CopyTo (array, index);
475 public void CopyTo (T[] array, int index)
477 underlyingColl.CopyTo (array, index);
480 public IEnumerable<T> GetConsumingEnumerable ()
482 return GetConsumingEnumerable (CancellationToken.None);
485 public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
488 T item = default (T);
491 item = Take (cancellationToken);
493 // Then the exception is perfectly normal
504 IEnumerator IEnumerable.GetEnumerator ()
506 return ((IEnumerable)underlyingColl).GetEnumerator ();
509 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
511 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
514 public void Dispose ()
519 protected virtual void Dispose (bool disposing)
524 public T[] ToArray ()
526 return underlyingColl.ToArray ();
529 public int BoundedCapacity {
537 return underlyingColl.Count;
541 public bool IsAddingCompleted {
543 return isComplete.Value;
547 public bool IsCompleted {
549 return isComplete.Value && addId == removeId;
553 object ICollection.SyncRoot {
555 return underlyingColl.SyncRoot;
559 bool ICollection.IsSynchronized {
561 return underlyingColl.IsSynchronized;