2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #if NET_4_0 || BOOTSTRAP_NET_4_0
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 readonly IProducerConsumerCollection<T> underlyingColl;
43 readonly int upperBound;
45 AtomicBoolean isComplete;
48 long addId = long.MinValue;
49 long removeId = long.MinValue;
52 public BlockingCollection ()
53 : this (new ConcurrentQueue<T> (), -1)
57 public BlockingCollection (int upperBound)
58 : this (new ConcurrentQueue<T> (), upperBound)
62 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
63 : this (underlyingColl, -1)
67 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
69 this.underlyingColl = underlyingColl;
70 this.upperBound = upperBound;
71 this.isComplete = new AtomicBoolean ();
75 #region Add & Remove (+ Try)
76 public void Add (T item)
81 public void Add (T item, CancellationToken token)
83 Add (item, () => token.IsCancellationRequested);
86 void Add (T item, Func<bool> cancellationFunc)
89 long cachedAddId = addId;
90 long cachedRemoveId = removeId;
92 if (upperBound != -1) {
93 if (cachedAddId - cachedRemoveId > upperBound) {
99 // Check our transaction id against completed stored one
100 if (isComplete.Value && cachedAddId >= completeId)
101 throw new InvalidOperationException ("The BlockingCollection<T> has"
102 + " been marked as complete with regards to additions.");
104 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
107 if (cancellationFunc != null && cancellationFunc ())
108 throw new OperationCanceledException ("CancellationToken triggered");
112 if (!underlyingColl.TryAdd (item))
113 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
121 public T Take (CancellationToken token)
123 return Take (() => token.IsCancellationRequested);
126 T Take (Func<bool> cancellationFunc)
129 long cachedRemoveId = removeId;
130 long cachedAddId = addId;
133 if (cachedRemoveId == cachedAddId) {
135 throw new OperationCanceledException ("The BlockingCollection<T> has"
136 + " been marked as complete with regards to additions.");
142 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
145 if (cancellationFunc != null && cancellationFunc ())
146 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
150 while (!underlyingColl.TryTake (out item));
155 public bool TryAdd (T item)
157 return TryAdd (item, null, null);
160 bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
163 if (token.HasValue && token.Value.IsCancellationRequested)
164 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
166 long cachedAddId = addId;
167 long cachedRemoveId = removeId;
169 if (upperBound != -1) {
170 if (cachedAddId - cachedRemoveId > upperBound) {
175 // Check our transaction id against completed stored one
176 if (isComplete.Value && cachedAddId >= completeId)
177 throw new InvalidOperationException ("The BlockingCollection<T> has"
178 + " been marked as complete with regards to additions.");
180 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
183 if (!underlyingColl.TryAdd (item))
184 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
187 } while (contFunc != null && contFunc ());
192 public bool TryAdd (T item, TimeSpan ts)
194 return TryAdd (item, (int)ts.TotalMilliseconds);
197 public bool TryAdd (T item, int millisecondsTimeout)
199 Stopwatch sw = Stopwatch.StartNew ();
200 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
203 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
205 Stopwatch sw = Stopwatch.StartNew ();
206 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
209 public bool TryTake (out T item)
211 return TryTake (out item, null, null);
214 bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
219 if (token.HasValue && token.Value.IsCancellationRequested)
220 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
222 long cachedRemoveId = removeId;
223 long cachedAddId = addId;
226 if (cachedRemoveId == cachedAddId) {
233 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
236 return underlyingColl.TryTake (out item);
237 } while (contFunc != null && contFunc ());
242 public bool TryTake (out T item, TimeSpan ts)
244 return TryTake (out item, (int)ts.TotalMilliseconds);
247 public bool TryTake (out T item, int millisecondsTimeout)
250 Stopwatch sw = Stopwatch.StartNew ();
252 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
255 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
258 Stopwatch sw = Stopwatch.StartNew ();
260 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
264 #region static methods
265 static void CheckArray (BlockingCollection<T>[] collections)
267 if (collections == null)
268 throw new ArgumentNullException ("collections");
269 if (collections.Length == 0 || IsThereANullElement (collections))
270 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
273 static bool IsThereANullElement (BlockingCollection<T>[] collections)
275 foreach (BlockingCollection<T> e in collections)
281 public static int AddToAny (BlockingCollection<T>[] collections, T item)
283 CheckArray (collections);
285 foreach (var coll in collections) {
295 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
297 CheckArray (collections);
299 foreach (var coll in collections) {
301 coll.Add (item, token);
309 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
311 CheckArray (collections);
313 foreach (var coll in collections) {
314 if (coll.TryAdd (item))
321 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
323 CheckArray (collections);
325 foreach (var coll in collections) {
326 if (coll.TryAdd (item, ts))
333 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
335 CheckArray (collections);
337 foreach (var coll in collections) {
338 if (coll.TryAdd (item, millisecondsTimeout))
345 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
346 CancellationToken token)
348 CheckArray (collections);
350 foreach (var coll in collections) {
351 if (coll.TryAdd (item, millisecondsTimeout, token))
358 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
361 CheckArray (collections);
363 foreach (var coll in collections) {
373 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
376 CheckArray (collections);
378 foreach (var coll in collections) {
380 item = coll.Take (token);
388 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
392 CheckArray (collections);
394 foreach (var coll in collections) {
395 if (coll.TryTake (out item))
402 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
406 CheckArray (collections);
408 foreach (var coll in collections) {
409 if (coll.TryTake (out item, ts))
416 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
420 CheckArray (collections);
422 foreach (var coll in collections) {
423 if (coll.TryTake (out item, millisecondsTimeout))
430 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
431 CancellationToken token)
435 CheckArray (collections);
437 foreach (var coll in collections) {
438 if (coll.TryTake (out item, millisecondsTimeout, token))
446 public void CompleteAdding ()
448 // No further add beside that point
450 isComplete.Value = true;
453 void ICollection.CopyTo (Array array, int index)
455 underlyingColl.CopyTo (array, index);
458 public void CopyTo (T[] array, int index)
460 underlyingColl.CopyTo (array, index);
463 public IEnumerable<T> GetConsumingEnumerable ()
465 return GetConsumingEnumerable (Take);
468 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
470 return GetConsumingEnumerable (() => Take (token));
473 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
476 T item = default (T);
488 IEnumerator IEnumerable.GetEnumerator ()
490 return ((IEnumerable)underlyingColl).GetEnumerator ();
493 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
495 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
498 public void Dispose ()
503 protected virtual void Dispose (bool managedRes)
508 public T[] ToArray ()
510 return underlyingColl.ToArray ();
516 // Method used to stall the thread for a limited period of time before retrying an operation
522 public int BoundedCapacity {
530 return underlyingColl.Count;
534 public bool IsAddingCompleted {
536 return isComplete.Value;
540 public bool IsCompleted {
542 return isComplete.Value && addId == removeId;
546 object ICollection.SyncRoot {
548 return underlyingColl.SyncRoot;
552 bool ICollection.IsSynchronized {
554 return underlyingColl.IsSynchronized;