2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 const int spinCount = 5;
44 readonly IProducerConsumerCollection<T> underlyingColl;
46 /* These events are used solely for the purpose of having an optimized sleep cycle when
47 * the BlockingCollection have to wait on an external event (Add or Remove for instance)
49 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
50 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
51 AtomicBoolean isComplete;
53 readonly int upperBound;
57 /* The whole idea of the collection is to use these two long values in a transactional
58 * way to track and manage the actual data inside the underlying lock-free collection
59 * instead of directly working with it or using external locking.
61 * They are manipulated with CAS and are guaranteed to increase over time and use
62 * of the instance thus preventing ABA problems.
64 int addId = int.MinValue;
65 int removeId = int.MinValue;
68 /* For time based operations, we share this instance of Stopwatch and base calculation
69 on a time offset at each of these method call */
70 static Stopwatch watch = Stopwatch.StartNew ();
73 public BlockingCollection ()
74 : this (new ConcurrentQueue<T> (), -1)
78 public BlockingCollection (int boundedCapacity)
79 : this (new ConcurrentQueue<T> (), boundedCapacity)
83 public BlockingCollection (IProducerConsumerCollection<T> collection)
84 : this (collection, -1)
88 public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
90 this.underlyingColl = collection;
91 this.upperBound = boundedCapacity;
92 this.isComplete = new AtomicBoolean ();
96 #region Add & Remove (+ Try)
97 public void Add (T item)
99 Add (item, CancellationToken.None);
102 public void Add (T item, CancellationToken cancellationToken)
104 TryAdd (item, -1, cancellationToken);
107 public bool TryAdd (T item)
109 return TryAdd (item, 0, CancellationToken.None);
112 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
114 if (millisecondsTimeout < -1)
115 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
117 long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
118 SpinWait sw = new SpinWait ();
121 cancellationToken.ThrowIfCancellationRequested ();
123 int cachedAddId = addId;
124 int cachedRemoveId = removeId;
125 int itemsIn = cachedAddId - cachedRemoveId;
127 // If needed, we check and wait that the collection isn't full
128 if (upperBound != -1 && itemsIn > upperBound) {
129 if (millisecondsTimeout == 0)
132 if (sw.Count <= spinCount) {
136 if (cachedRemoveId != removeId || cachedAddId != addId) {
141 mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
147 // Check our transaction id against completed stored one
148 if (isComplete.Value && cachedAddId >= completeId)
149 ThrowCompleteException ();
151 // Validate the steps we have been doing until now
152 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
155 // We have a slot reserved in the underlying collection, try to take it
156 if (!underlyingColl.TryAdd (item))
157 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
159 // Wake up process that may have been sleeping
163 } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
168 public bool TryAdd (T item, TimeSpan timeout)
170 return TryAdd (item, (int)timeout.TotalMilliseconds);
173 public bool TryAdd (T item, int millisecondsTimeout)
175 return TryAdd (item, millisecondsTimeout, CancellationToken.None);
180 return Take (CancellationToken.None);
183 public T Take (CancellationToken cancellationToken)
186 TryTake (out item, -1, cancellationToken, true);
191 public bool TryTake (out T item)
193 return TryTake (out item, 0, CancellationToken.None);
196 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
198 return TryTake (out item, millisecondsTimeout, cancellationToken, false);
201 bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
203 if (milliseconds < -1)
204 throw new ArgumentOutOfRangeException ("milliseconds");
207 SpinWait sw = new SpinWait ();
208 long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
211 cancellationToken.ThrowIfCancellationRequested ();
213 int cachedRemoveId = removeId;
214 int cachedAddId = addId;
217 if (cachedRemoveId == cachedAddId) {
218 if (milliseconds == 0)
223 ThrowCompleteException ();
228 if (sw.Count <= spinCount) {
232 if (cachedRemoveId != removeId || cachedAddId != addId) {
237 mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
243 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
246 while (!underlyingColl.TryTake (out item));
252 } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
257 public bool TryTake (out T item, TimeSpan timeout)
259 return TryTake (out item, (int)timeout.TotalMilliseconds);
262 public bool TryTake (out T item, int millisecondsTimeout)
266 return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
269 static int ComputeTimeout (int millisecondsTimeout, long start)
271 return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
275 #region static methods
276 static void CheckArray (BlockingCollection<T>[] collections)
278 if (collections == null)
279 throw new ArgumentNullException ("collections");
280 if (collections.Length == 0 || IsThereANullElement (collections))
281 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
284 static bool IsThereANullElement (BlockingCollection<T>[] collections)
286 foreach (BlockingCollection<T> e in collections)
292 public static int AddToAny (BlockingCollection<T>[] collections, T item)
294 CheckArray (collections);
296 foreach (var coll in collections) {
306 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
308 CheckArray (collections);
310 foreach (var coll in collections) {
312 coll.Add (item, cancellationToken);
320 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
322 CheckArray (collections);
324 foreach (var coll in collections) {
325 if (coll.TryAdd (item))
332 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
334 CheckArray (collections);
336 foreach (var coll in collections) {
337 if (coll.TryAdd (item, timeout))
344 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
346 CheckArray (collections);
348 foreach (var coll in collections) {
349 if (coll.TryAdd (item, millisecondsTimeout))
356 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
357 CancellationToken cancellationToken)
359 CheckArray (collections);
361 foreach (var coll in collections) {
362 if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
369 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
372 CheckArray (collections);
373 WaitHandle[] wait_table = null;
376 for (int i = 0; i < collections.Length; ++i) {
377 if (collections [i].TryTake (out item))
380 if (wait_table == null) {
381 wait_table = new WaitHandle [collections.Length];
382 for (int i = 0; i < collections.Length; ++i)
383 wait_table [i] = collections [i].mreRemove.WaitHandle;
385 WaitHandle.WaitAny (wait_table);
390 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
393 CheckArray (collections);
394 WaitHandle[] wait_table = null;
397 for (int i = 0; i < collections.Length; ++i) {
398 if (collections [i].TryTake (out item))
401 cancellationToken.ThrowIfCancellationRequested ();
402 if (wait_table == null) {
403 wait_table = new WaitHandle [collections.Length + 1];
404 for (int i = 0; i < collections.Length; ++i)
405 wait_table [i] = collections [i].mreRemove.WaitHandle;
406 wait_table [collections.Length] = cancellationToken.WaitHandle;
408 WaitHandle.WaitAny (wait_table);
409 cancellationToken.ThrowIfCancellationRequested ();
415 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
419 CheckArray (collections);
421 foreach (var coll in collections) {
422 if (coll.TryTake (out item))
429 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
433 CheckArray (collections);
435 foreach (var coll in collections) {
436 if (coll.TryTake (out item, timeout))
443 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
447 CheckArray (collections);
449 foreach (var coll in collections) {
450 if (coll.TryTake (out item, millisecondsTimeout))
457 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
458 CancellationToken cancellationToken)
462 CheckArray (collections);
464 foreach (var coll in collections) {
465 if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
473 public void CompleteAdding ()
475 // No further add beside that point
477 isComplete.Value = true;
478 // Wakeup some operation in case this has an impact
483 void ThrowCompleteException ()
485 throw new InvalidOperationException ("The BlockingCollection<T> has"
486 + " been marked as complete with regards to additions.");
489 void ICollection.CopyTo (Array array, int index)
491 underlyingColl.CopyTo (array, index);
494 public void CopyTo (T[] array, int index)
496 underlyingColl.CopyTo (array, index);
499 public IEnumerable<T> GetConsumingEnumerable ()
501 return GetConsumingEnumerable (CancellationToken.None);
504 public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
507 T item = default (T);
510 item = Take (cancellationToken);
512 // Then the exception is perfectly normal
523 IEnumerator IEnumerable.GetEnumerator ()
525 return ((IEnumerable)underlyingColl).GetEnumerator ();
528 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
530 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
533 public void Dispose ()
538 protected virtual void Dispose (bool disposing)
543 public T[] ToArray ()
545 return underlyingColl.ToArray ();
548 public int BoundedCapacity {
556 return underlyingColl.Count;
560 public bool IsAddingCompleted {
562 return isComplete.Value;
566 public bool IsCompleted {
568 return isComplete.Value && addId == removeId;
572 object ICollection.SyncRoot {
574 return underlyingColl.SyncRoot;
578 bool ICollection.IsSynchronized {
580 return underlyingColl.IsSynchronized;