2 // BlockingCollection.cs
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
35 namespace System.Collections.Concurrent
38 [DebuggerDisplay ("Count={Count}")]
39 [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
42 readonly IProducerConsumerCollection<T> underlyingColl;
43 readonly int upperBound;
45 readonly SpinWait sw = new SpinWait ();
47 AtomicBoolean isComplete;
50 long addId = long.MinValue;
51 long removeId = long.MinValue;
54 public BlockingCollection ()
55 : this (new ConcurrentQueue<T> (), -1)
59 public BlockingCollection (int upperBound)
60 : this (new ConcurrentQueue<T> (), upperBound)
64 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
65 : this (underlyingColl, -1)
69 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
71 this.underlyingColl = underlyingColl;
72 this.upperBound = upperBound;
73 this.isComplete = new AtomicBoolean ();
77 #region Add & Remove (+ Try)
78 public void Add (T item)
83 public void Add (T item, CancellationToken token)
85 Add (item, () => token.IsCancellationRequested);
88 void Add (T item, Func<bool> cancellationFunc)
91 long cachedAddId = addId;
92 long cachedRemoveId = removeId;
94 if (upperBound != -1) {
95 if (cachedAddId - cachedRemoveId > upperBound) {
101 // Check our transaction id against completed stored one
102 if (isComplete.Value && cachedAddId >= completeId)
103 throw new InvalidOperationException ("The BlockingCollection<T> has"
104 + " been marked as complete with regards to additions.");
106 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
109 if (cancellationFunc != null && cancellationFunc ())
110 throw new OperationCanceledException ("CancellationToken triggered");
114 if (!underlyingColl.TryAdd (item))
115 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
123 public T Take (CancellationToken token)
125 return Take (() => token.IsCancellationRequested);
128 T Take (Func<bool> cancellationFunc)
131 long cachedRemoveId = removeId;
132 long cachedAddId = addId;
135 if (cachedRemoveId == cachedAddId) {
136 if (isComplete.Value && cachedRemoveId >= completeId)
137 throw new OperationCanceledException ("The BlockingCollection<T> has"
138 + " been marked as complete with regards to additions.");
144 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
147 if (cancellationFunc != null && cancellationFunc ())
148 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
152 while (!underlyingColl.TryTake (out item));
157 public bool TryAdd (T item)
159 return TryAdd (item, null, null);
162 bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
165 if (token.HasValue && token.Value.IsCancellationRequested)
166 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
168 long cachedAddId = addId;
169 long cachedRemoveId = removeId;
171 if (upperBound != -1) {
172 if (cachedAddId - cachedRemoveId > upperBound) {
177 // Check our transaction id against completed stored one
178 if (isComplete.Value && cachedAddId >= completeId)
179 throw new InvalidOperationException ("The BlockingCollection<T> has"
180 + " been marked as complete with regards to additions.");
182 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
185 if (!underlyingColl.TryAdd (item))
186 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
189 } while (contFunc != null && contFunc ());
194 public bool TryAdd (T item, TimeSpan ts)
196 return TryAdd (item, (int)ts.TotalMilliseconds);
199 public bool TryAdd (T item, int millisecondsTimeout)
201 Stopwatch sw = Stopwatch.StartNew ();
202 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
205 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
207 Stopwatch sw = Stopwatch.StartNew ();
208 return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
211 public bool TryTake (out T item)
213 return TryTake (out item, null, null);
216 bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
221 if (token.HasValue && token.Value.IsCancellationRequested)
222 throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
224 long cachedRemoveId = removeId;
225 long cachedAddId = addId;
228 if (cachedRemoveId == cachedAddId) {
229 if (isComplete.Value && cachedRemoveId >= completeId)
235 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
238 return underlyingColl.TryTake (out item);
239 } while (contFunc != null && contFunc ());
244 public bool TryTake (out T item, TimeSpan ts)
246 return TryTake (out item, (int)ts.TotalMilliseconds);
249 public bool TryTake (out T item, int millisecondsTimeout)
252 Stopwatch sw = Stopwatch.StartNew ();
254 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
257 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
260 Stopwatch sw = Stopwatch.StartNew ();
262 return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
266 #region static methods
267 static void CheckArray (BlockingCollection<T>[] collections)
269 if (collections == null)
270 throw new ArgumentNullException ("collections");
271 if (collections.Length == 0 || IsThereANullElement (collections))
272 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
275 static bool IsThereANullElement (BlockingCollection<T>[] collections)
277 foreach (BlockingCollection<T> e in collections)
283 public static int AddToAny (BlockingCollection<T>[] collections, T item)
285 CheckArray (collections);
287 foreach (var coll in collections) {
297 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
299 CheckArray (collections);
301 foreach (var coll in collections) {
303 coll.Add (item, token);
311 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
313 CheckArray (collections);
315 foreach (var coll in collections) {
316 if (coll.TryAdd (item))
323 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
325 CheckArray (collections);
327 foreach (var coll in collections) {
328 if (coll.TryAdd (item, ts))
335 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
337 CheckArray (collections);
339 foreach (var coll in collections) {
340 if (coll.TryAdd (item, millisecondsTimeout))
347 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
348 CancellationToken token)
350 CheckArray (collections);
352 foreach (var coll in collections) {
353 if (coll.TryAdd (item, millisecondsTimeout, token))
360 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
363 CheckArray (collections);
365 foreach (var coll in collections) {
375 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
378 CheckArray (collections);
380 foreach (var coll in collections) {
382 item = coll.Take (token);
390 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
394 CheckArray (collections);
396 foreach (var coll in collections) {
397 if (coll.TryTake (out item))
404 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
408 CheckArray (collections);
410 foreach (var coll in collections) {
411 if (coll.TryTake (out item, ts))
418 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
422 CheckArray (collections);
424 foreach (var coll in collections) {
425 if (coll.TryTake (out item, millisecondsTimeout))
432 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
433 CancellationToken token)
437 CheckArray (collections);
439 foreach (var coll in collections) {
440 if (coll.TryTake (out item, millisecondsTimeout, token))
448 public void CompleteAdding ()
450 // No further add beside that point
452 isComplete.Value = true;
455 void ICollection.CopyTo (Array array, int index)
457 underlyingColl.CopyTo (array, index);
460 public void CopyTo (T[] array, int index)
462 underlyingColl.CopyTo (array, index);
465 public IEnumerable<T> GetConsumingEnumerable ()
467 return GetConsumingEnumerable (Take);
470 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
472 return GetConsumingEnumerable (() => Take (token));
475 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
478 T item = default (T);
490 IEnumerator IEnumerable.GetEnumerator ()
492 return ((IEnumerable)underlyingColl).GetEnumerator ();
495 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
497 return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
500 public void Dispose ()
505 protected virtual void Dispose (bool managedRes)
510 public T[] ToArray ()
512 return underlyingColl.ToArray ();
515 // Method used to stall the thread for a limited period of time before retrying an operation
521 public int BoundedCapacity {
529 return underlyingColl.Count;
533 public bool IsAddingCompleted {
535 return isComplete.Value;
539 public bool IsCompleted {
541 return isComplete.Value && addId == removeId;
545 object ICollection.SyncRoot {
547 return underlyingColl.SyncRoot;
551 bool ICollection.IsSynchronized {
553 return underlyingColl.IsSynchronized;