3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // BlockingCollection.cs
10 // <OWNER>Microsoft</OWNER>
12 // A class that implements the bounding and blocking functionality while abstracting away
13 // the underlying storage mechanism. This file also contains BlockingCollection's
14 // associated debugger view type.
16 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
17 #pragma warning disable 0420
19 using System.Collections.Generic;
20 using System.Collections;
21 using System.Diagnostics;
22 using System.Globalization;
23 using System.Security.Permissions;
24 using System.Runtime.InteropServices;
25 using System.Threading;
27 namespace System.Collections.Concurrent
30 /// Provides blocking and bounding capabilities for thread-safe collections that
31 /// implement <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
34 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> represents a collection
35 /// that allows for thread-safe adding and removing of data.
36 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is used as a wrapper
37 /// for an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> instance, allowing
38 /// removal attempts from the collection to block until data is available to be removed. Similarly,
39 /// a <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> can be created to enforce
40 /// an upper-bound on the number of data elements allowed in the
41 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>; addition attempts to the
42 /// collection may then block until space is available to store the added items. In this manner,
43 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is similar to a traditional
44 /// blocking queue data structure, except that the underlying data storage mechanism is abstracted
45 /// away as an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
47 /// <typeparam name="T">Specifies the type of elements in the collection.</typeparam>
50 #pragma warning disable 0618
51 [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
52 #pragma warning restore 0618
54 [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
55 [DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
56 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IDisposable, IReadOnlyCollection<T>
58 private IProducerConsumerCollection<T> m_collection;
59 private int m_boundedCapacity;
60 private const int NON_BOUNDED = -1;
61 private SemaphoreSlim m_freeNodes;
62 private SemaphoreSlim m_occupiedNodes;
63 private bool m_isDisposed;
64 private CancellationTokenSource m_ConsumersCancellationTokenSource;
65 private CancellationTokenSource m_ProducersCancellationTokenSource;
67 private volatile int m_currentAdders;
68 private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000);
71 /// <summary>Gets the bounded capacity of this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</summary>
72 /// <value>The bounded capacity of this collection, or int.MaxValue if no bound was supplied.</value>
73 /// <exception cref="T:System.ObjectDisposedException">The <see
74 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
75 public int BoundedCapacity
80 return m_boundedCapacity;
84 /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding.</summary>
85 /// <value>Whether this collection has been marked as complete for adding.</value>
86 /// <exception cref="T:System.ObjectDisposedException">The <see
87 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
88 public bool IsAddingCompleted
93 return (m_currentAdders == COMPLETE_ADDING_ON_MASK);
97 /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding and is empty.</summary>
98 /// <value>Whether this collection has been marked as complete for adding and is empty.</value>
99 /// <exception cref="T:System.ObjectDisposedException">The <see
100 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
101 public bool IsCompleted
106 return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
110 /// <summary>Gets the number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
111 /// <value>The number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</value>
112 /// <exception cref="T:System.ObjectDisposedException">The <see
113 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
119 return m_occupiedNodes.CurrentCount;
123 /// <summary>Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is synchronized.</summary>
124 /// <exception cref="T:System.ObjectDisposedException">The <see
125 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
126 bool ICollection.IsSynchronized
136 /// Gets an object that can be used to synchronize access to the <see
137 /// cref="T:System.Collections.ICollection"/>. This property is not supported.
139 /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception>
140 object ICollection.SyncRoot
144 throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported));
150 /// <summary>Initializes a new instance of the
151 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
152 /// class without an upper-bound.
155 /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>.
157 public BlockingCollection()
158 : this(new ConcurrentQueue<T>())
162 /// <summary>Initializes a new instance of the <see
163 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
164 /// class with the specified upper-bound.
166 /// <param name="boundedCapacity">The bounded size of the collection.</param>
167 /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is
168 /// not a positive value.</exception>
170 /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>.
172 public BlockingCollection(int boundedCapacity)
173 : this(new ConcurrentQueue<T>(), boundedCapacity)
177 /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
178 /// class with the specified upper-bound and using the provided
179 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
180 /// <param name="collection">The collection to use as the underlying data store.</param>
181 /// <param name="boundedCapacity">The bounded size of the collection.</param>
182 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
183 /// null.</exception>
184 /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is not a positive value.</exception>
185 /// <exception cref="System.ArgumentException">The supplied <paramref name="collection"/> contains more values
186 /// than is permitted by <paramref name="boundedCapacity"/>.</exception>
187 public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
189 if (boundedCapacity < 1)
191 throw new ArgumentOutOfRangeException(
192 "boundedCapacity", boundedCapacity,
193 SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
195 if (collection == null)
197 throw new ArgumentNullException("collection");
199 int count = collection.Count;
200 if (count > boundedCapacity)
202 throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
204 Initialize(collection, boundedCapacity, count);
207 /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
208 /// class without an upper-bound and using the provided
209 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
210 /// <param name="collection">The collection to use as the underlying data store.</param>
211 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
212 /// null.</exception>
213 public BlockingCollection(IProducerConsumerCollection<T> collection)
215 if (collection == null)
217 throw new ArgumentNullException("collection");
219 Initialize(collection, NON_BOUNDED, collection.Count);
222 /// <summary>Initializes the BlockingCollection instance.</summary>
223 /// <param name="collection">The collection to use as the underlying data store.</param>
224 /// <param name="boundedCapacity">The bounded size of the collection.</param>
225 /// <param name="collectionCount">The number of items currently in the underlying collection.</param>
226 private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
228 Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
230 m_collection = collection;
231 m_boundedCapacity = boundedCapacity; ;
232 m_isDisposed = false;
233 m_ConsumersCancellationTokenSource = new CancellationTokenSource();
234 m_ProducersCancellationTokenSource = new CancellationTokenSource();
236 if (boundedCapacity == NON_BOUNDED)
242 Debug.Assert(boundedCapacity > 0);
243 m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
247 m_occupiedNodes = new SemaphoreSlim(collectionCount);
252 /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
254 /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
255 /// <exception cref="T:System.InvalidOperationException">The <see
256 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
257 /// as complete with regards to additions.</exception>
258 /// <exception cref="T:System.ObjectDisposedException">The <see
259 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
260 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
262 /// If a bounded capacity was specified when this instance of
263 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
264 /// a call to Add may block until space is available to store the provided item.
266 public void Add(T item)
269 bool tryAddReturnValue =
271 TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
273 Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
278 /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
279 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
282 /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
283 /// <param name="cancellationToken">A cancellation token to observe.</param>
284 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
285 /// <exception cref="T:System.InvalidOperationException">The <see
286 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
287 /// as complete with regards to additions.</exception>
288 /// <exception cref="T:System.ObjectDisposedException">The <see
289 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
290 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
292 /// If a bounded capacity was specified when this instance of
293 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
294 /// a call to <see cref="Add(T,System.Threading.CancellationToken)"/> may block until space is available to store the provided item.
296 public void Add(T item, CancellationToken cancellationToken)
299 bool tryAddReturnValue =
301 TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
303 Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
308 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
310 /// <param name="item">The item to be added to the collection.</param>
311 /// <returns>true if the <paramref name="item"/> could be added; otherwise, false.</returns>
312 /// <exception cref="T:System.InvalidOperationException">The <see
313 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
314 /// as complete with regards to additions.</exception>
315 /// <exception cref="T:System.ObjectDisposedException">The <see
316 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
317 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
318 public bool TryAdd(T item)
320 return TryAddWithNoTimeValidation(item, 0, new CancellationToken());
324 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
326 /// <param name="item">The item to be added to the collection.</param>
327 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
328 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
330 /// <returns>true if the <paramref name="item"/> could be added to the collection within
331 /// the alloted time; otherwise, false.</returns>
332 /// <exception cref="T:System.InvalidOperationException">The <see
333 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
334 /// as complete with regards to additions.</exception>
335 /// <exception cref="T:System.ObjectDisposedException">The <see
336 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
337 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
338 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
339 /// <see cref="System.Int32.MaxValue"/>.</exception>
340 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
341 public bool TryAdd(T item, TimeSpan timeout)
343 ValidateTimeout(timeout);
344 return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken());
348 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
350 /// <param name="item">The item to be added to the collection.</param>
351 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
352 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
353 /// <returns>true if the <paramref name="item"/> could be added to the collection within
354 /// the alloted time; otherwise, false.</returns>
355 /// <exception cref="T:System.InvalidOperationException">The <see
356 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
357 /// as complete with regards to additions.</exception>
358 /// <exception cref="T:System.ObjectDisposedException">The <see
359 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
360 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
361 /// negative number other than -1, which represents an infinite time-out.</exception>
362 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
363 public bool TryAdd(T item, int millisecondsTimeout)
365 ValidateMillisecondsTimeout(millisecondsTimeout);
366 return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken());
370 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
371 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
374 /// <param name="item">The item to be added to the collection.</param>
375 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
376 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
377 /// <param name="cancellationToken">A cancellation token to observe.</param>
378 /// <returns>true if the <paramref name="item"/> could be added to the collection within
379 /// the alloted time; otherwise, false.</returns>
380 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
381 /// <exception cref="T:System.InvalidOperationException">The <see
382 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
383 /// as complete with regards to additions.</exception>
384 /// <exception cref="T:System.ObjectDisposedException">The <see
385 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
386 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
387 /// negative number other than -1, which represents an infinite time-out.</exception>
388 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
389 public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
391 ValidateMillisecondsTimeout(millisecondsTimeout);
392 return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
395 /// <summary>Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add
396 /// method. If a bounded capacity was specified and the collection was full,
397 /// this method will wait for, at most, the timeout period trying to add the item.
398 /// If the timeout period was exhaused before successfully adding the item this method will
399 /// return false.</summary>
400 /// <param name="item">The item to be added to the collection.</param>
401 /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to accept the item,
402 /// or Timeout.Infinite to wait indefinitely.</param>
403 /// <param name="cancellationToken">A cancellation token to observe.</param>
404 /// <returns>False if the collection remained full till the timeout period was exhausted.True otherwise.</returns>
405 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
406 /// <exception cref="System.InvalidOperationException">the collection has already been marked
407 /// as complete with regards to additions.</exception>
408 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
409 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
410 private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
414 if (cancellationToken.IsCancellationRequested)
415 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
417 if (IsAddingCompleted)
419 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
422 bool waitForSemaphoreWasSuccessful = true;
424 if (m_freeNodes != null)
426 //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
427 //was called concurrently with Adding which is not supported by BlockingCollection.
428 CancellationTokenSource linkedTokenSource = null;
431 waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
432 if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
434 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
435 cancellationToken, m_ProducersCancellationTokenSource.Token);
436 waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
439 catch (OperationCanceledException)
441 //if cancellation was via external token, throw an OCE
442 if (cancellationToken.IsCancellationRequested)
443 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
445 //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
446 //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
448 throw new InvalidOperationException
449 (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
453 if (linkedTokenSource != null)
455 linkedTokenSource.Dispose();
459 if (waitForSemaphoreWasSuccessful)
461 // Update the adders count if the complete adding was not requested, otherwise
462 // spins until all adders finish then throw IOE
463 // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
464 // not been finished yet
465 SpinWait spinner = new SpinWait();
468 int observedAdders = m_currentAdders;
469 if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
472 // CompleteAdding is requested, spin then throw
473 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
474 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
476 if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
478 Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
484 // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
485 // 1- m_collection.TryAdd threw an exception
486 // 2- m_collection.TryAdd succeeded
487 // 3- m_collection.TryAdd returned false
488 // so we put the decrement code in the finally block
492 //TryAdd is guaranteed to find a place to add the element. Its return value depends
493 //on the semantics of the underlying store. Some underlying stores will not add an already
494 //existing item and thus TryAdd returns false indicating that the size of the underlying
495 //store did not increase.
498 bool addingSucceeded = false;
501 //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
502 //This fixes bug #702328, case 2 of 2.
503 cancellationToken.ThrowIfCancellationRequested();
504 addingSucceeded = m_collection.TryAdd(item);
508 //TryAdd did not result in increasing the size of the underlying store and hence we need
509 //to increment back the count of the m_freeNodes semaphore.
510 if (m_freeNodes != null)
512 m_freeNodes.Release();
518 //After adding an element to the underlying storage, signal to the consumers
519 //waiting on m_occupiedNodes that there is a new item added ready to be consumed.
520 m_occupiedNodes.Release();
524 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
529 // decrement the adders count
530 Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
531 Interlocked.Decrement(ref m_currentAdders);
536 return waitForSemaphoreWasSuccessful;
539 /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
540 /// <returns>The item removed from the collection.</returns>
541 /// <exception cref="T:System.OperationCanceledException">The <see
542 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
543 /// as complete with regards to additions.</exception>
544 /// <exception cref="T:System.ObjectDisposedException">The <see
545 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
546 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
547 /// outside of this <see
548 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
549 /// <remarks>A call to <see cref="Take()"/> may block until an item is available to be removed.</remarks>
554 if (!TryTake(out item, Timeout.Infinite, CancellationToken.None))
556 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
562 /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
563 /// <returns>The item removed from the collection.</returns>
564 /// <exception cref="T:System.OperationCanceledException">If the <see cref="CancellationToken"/> is
565 /// canceled or the <see
566 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
567 /// as complete with regards to additions.</exception>
568 /// <exception cref="T:System.ObjectDisposedException">The <see
569 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
570 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
571 /// outside of this <see
572 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
573 /// <remarks>A call to <see cref="Take(CancellationToken)"/> may block until an item is available to be removed.</remarks>
574 public T Take(CancellationToken cancellationToken)
578 if (!TryTake(out item, Timeout.Infinite, cancellationToken))
580 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
587 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
589 /// <param name="item">The item removed from the collection.</param>
590 /// <returns>true if an item could be removed; otherwise, false.</returns>
591 /// <exception cref="T:System.ObjectDisposedException">The <see
592 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
593 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
594 /// outside of this <see
595 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
596 public bool TryTake(out T item)
598 return TryTake(out item, 0, CancellationToken.None);
602 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
604 /// <param name="item">The item removed from the collection.</param>
605 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
606 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
608 /// <returns>true if an item could be removed from the collection within
609 /// the alloted time; otherwise, false.</returns>
610 /// <exception cref="T:System.ObjectDisposedException">The <see
611 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
612 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
613 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
614 /// <see cref="System.Int32.MaxValue"/>.</exception>
615 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
616 /// outside of this <see
617 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
618 public bool TryTake(out T item, TimeSpan timeout)
620 ValidateTimeout(timeout);
621 return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
625 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
627 /// <param name="item">The item removed from the collection.</param>
628 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
629 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
630 /// <returns>true if an item could be removed from the collection within
631 /// the alloted time; otherwise, false.</returns>
632 /// <exception cref="T:System.ObjectDisposedException">The <see
633 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
634 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
635 /// negative number other than -1, which represents an infinite time-out.</exception>
636 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
637 /// outside of this <see
638 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
639 public bool TryTake(out T item, int millisecondsTimeout)
641 ValidateMillisecondsTimeout(millisecondsTimeout);
642 return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
646 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
647 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
650 /// <param name="item">The item removed from the collection.</param>
651 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
652 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
653 /// <param name="cancellationToken">A cancellation token to observe.</param>
654 /// <returns>true if an item could be removed from the collection within
655 /// the alloted time; otherwise, false.</returns>
656 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
657 /// <exception cref="T:System.ObjectDisposedException">The <see
658 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
659 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
660 /// negative number other than -1, which represents an infinite time-out.</exception>
661 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
662 /// outside of this <see
663 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
664 public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
666 ValidateMillisecondsTimeout(millisecondsTimeout);
667 return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
670 /// <summary>Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take
671 /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false)
672 /// trying to remove an item. If the timeout period was exhaused before successfully removing an item
673 /// this method will return false.
674 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
677 /// <param name="item">The item removed from the collection.</param>
678 /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to have an item available
679 /// for removal, or Timeout.Infinite to wait indefinitely.</param>
680 /// <param name="cancellationToken">A cancellation token to observe.</param>
681 /// <param name="combinedTokenSource">A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token
682 /// multiple times.</param>
683 /// <returns>False if the collection remained empty till the timeout period was exhausted. True otherwise.</returns>
684 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
685 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
686 private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)
691 if (cancellationToken.IsCancellationRequested)
692 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
694 //If the collection is completed then there is no need to wait.
699 bool waitForSemaphoreWasSuccessful = false;
701 // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable)
702 CancellationTokenSource linkedTokenSource = combinedTokenSource;
705 waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0);
706 if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
708 // create the linked token if it is not created yet
709 if (combinedTokenSource == null)
710 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
711 m_ConsumersCancellationTokenSource.Token);
712 waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
715 //The collection became completed while waiting on the semaphore.
716 catch (OperationCanceledException)
718 if (cancellationToken.IsCancellationRequested)
719 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
725 // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it
726 if (linkedTokenSource != null && combinedTokenSource == null)
728 linkedTokenSource.Dispose();
732 if (waitForSemaphoreWasSuccessful)
734 bool removeSucceeded = false;
735 bool removeFaulted = true;
738 //The token may have been canceled before an item arrived, so we need a check after the wait has completed.
739 //This fixes bug #702328, case 1 of 2.
740 cancellationToken.ThrowIfCancellationRequested();
742 //If an item was successfully removed from the underlying collection.
743 removeSucceeded = m_collection.TryTake(out item);
744 removeFaulted = false;
745 if (!removeSucceeded)
747 // Check if the collection is empty which means that the collection was modified outside BlockingCollection
748 throw new InvalidOperationException
749 (SR.GetString(SR.BlockingCollection_Take_CollectionModified));
754 // removeFaulted implies !removeSucceeded, but the reverse is not true.
757 if (m_freeNodes != null)
759 Debug.Assert(m_boundedCapacity != NON_BOUNDED);
760 m_freeNodes.Release();
763 else if (removeFaulted)
765 m_occupiedNodes.Release();
767 //Last remover will detect that it has actually removed the last item from the
768 //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores
769 //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
770 //but this is not a problem.
773 CancelWaitingConsumers();
777 return waitForSemaphoreWasSuccessful;
783 /// Adds the specified item to any one of the specified
784 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
786 /// <param name="collections">The array of collections.</param>
787 /// <param name="item">The item to be added to one of the collections.</param>
788 /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
789 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
790 /// null.</exception>
791 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
792 /// a 0-length array or contains a null element, or at least one of collections has been
793 /// marked as complete for adding.</exception>
794 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
795 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
796 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
797 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
798 /// 62 for STA and 63 for MTA.</exception>
800 /// If a bounded capacity was specified when all of the
801 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
802 /// a call to AddToAny may block until space is available in one of the collections
803 /// to store the provided item.
805 public static int AddToAny(BlockingCollection<T>[] collections, T item)
808 int tryAddAnyReturnValue =
812 TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None);
814 Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
815 , "TryAddToAny() was expected to return an index within the bounds of the collections array.");
816 return tryAddAnyReturnValue;
821 /// Adds the specified item to any one of the specified
822 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
823 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
826 /// <param name="collections">The array of collections.</param>
827 /// <param name="item">The item to be added to one of the collections.</param>
828 /// <param name="cancellationToken">A cancellation token to observe.</param>
829 /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
830 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
831 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
832 /// null.</exception>
833 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
834 /// a 0-length array or contains a null element, or at least one of collections has been
835 /// marked as complete for adding.</exception>
836 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
837 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
838 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
839 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
840 /// 62 for STA and 63 for MTA.</exception>
842 /// If a bounded capacity was specified when all of the
843 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
844 /// a call to AddToAny may block until space is available in one of the collections
845 /// to store the provided item.
847 public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
850 int tryAddAnyReturnValue =
854 TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
856 Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
857 , "TryAddToAny() was expected to return an index within the bounds of the collections array.");
858 return tryAddAnyReturnValue;
863 /// Attempts to add the specified item to any one of the specified
864 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
866 /// <param name="collections">The array of collections.</param>
867 /// <param name="item">The item to be added to one of the collections.</param>
868 /// <returns>The index of the collection in the <paramref name="collections"/>
869 /// array to which the item was added, or -1 if the item could not be added.</returns>
870 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
871 /// null.</exception>
872 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
873 /// a 0-length array or contains a null element, or at least one of collections has been
874 /// marked as complete for adding.</exception>
875 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
876 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
877 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
878 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
879 /// 62 for STA and 63 for MTA.</exception>
880 public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
882 return TryAddToAny(collections, item, 0, CancellationToken.None);
886 /// Attempts to add the specified item to any one of the specified
887 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
889 /// <param name="collections">The array of collections.</param>
890 /// <param name="item">The item to be added to one of the collections.</param>
891 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
892 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
894 /// <returns>The index of the collection in the <paramref name="collections"/>
895 /// array to which the item was added, or -1 if the item could not be added.</returns>
896 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
897 /// null.</exception>
898 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
899 /// a 0-length array or contains a null element, or at least one of collections has been
900 /// marked as complete for adding.</exception>
901 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
902 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
903 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
904 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
905 /// <see cref="System.Int32.MaxValue"/>.</exception>
906 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
907 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
908 /// 62 for STA and 63 for MTA.</exception>
909 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)
911 ValidateTimeout(timeout);
912 return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
916 /// Attempts to add the specified item to any one of the specified
917 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
919 /// <param name="collections">The array of collections.</param>
920 /// <param name="item">The item to be added to one of the collections.</param>
921 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
922 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> /// <returns>The index of the collection in the <paramref name="collections"/>
923 /// array to which the item was added, or -1 if the item could not be added.</returns>
924 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
925 /// null.</exception>
926 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
927 /// a 0-length array or contains a null element, or at least one of collections has been
928 /// marked as complete for adding.</exception>
929 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
930 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
931 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
932 /// negative number other than -1, which represents an infinite time-out.</exception>
933 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
934 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
935 /// 62 for STA and 63 for MTA.</exception>
936 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
938 ValidateMillisecondsTimeout(millisecondsTimeout);
939 return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
943 /// Attempts to add the specified item to any one of the specified
944 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
945 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
948 /// <param name="collections">The array of collections.</param>
949 /// <param name="item">The item to be added to one of the collections.</param>
950 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
951 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
952 /// <returns>The index of the collection in the <paramref name="collections"/>
953 /// array to which the item was added, or -1 if the item could not be added.</returns>
954 /// <param name="cancellationToken">A cancellation token to observe.</param>
955 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
956 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
957 /// null.</exception>
958 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
959 /// a 0-length array or contains a null element, or at least one of collections has been
960 /// marked as complete for adding.</exception>
961 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
962 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
963 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
964 /// negative number other than -1, which represents an infinite time-out.</exception>
965 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
966 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
967 /// 62 for STA and 63 for MTA.</exception>
968 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
970 ValidateMillisecondsTimeout(millisecondsTimeout);
971 return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
974 /// <summary>Adds an item to anyone of the specified collections.
975 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
978 /// <param name="collections">The collections into which the item can be added.</param>
979 /// <param name="item">The item to be added .</param>
980 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
981 /// operation, or -1 to wait indefinitely.</param>
982 /// <param name="externalCancellationToken">A cancellation token to observe.</param>
983 /// <returns>The index into collections for the collection which accepted the
984 /// adding of the item; -1 if the item could not be added.</returns>
985 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
986 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
987 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
988 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
989 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
990 private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
992 ValidateCollectionsArray(collections, true);
993 const int OPERATION_FAILED = -1;
995 // Copy the wait time to another local variable to update it
996 int timeout = millisecondsTimeout;
999 if (millisecondsTimeout != Timeout.Infinite)
1001 startTime = (uint)Environment.TickCount;
1004 // Fast path for adding if there is at least one unbounded collection
1005 int index = TryAddToAnyFast(collections, item);
1010 // Get wait handles and the tokens for all collections,
1011 // and construct a single combined token from all the tokens,
1012 // add the combined token handle to the handles list
1013 // call WaitAny for all handles
1014 // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
1015 // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
1016 // or one if the collection is AddingCompleted then throw AE
1017 CancellationToken[] collatedCancellationTokens;
1018 List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens);
1020 //Loop until one of these conditions is met:
1021 // 1- The operation is succeeded
1022 // 2- The timeout expired for try* versions
1023 // 3- The external token is cancelled, throw
1024 // 4- There is at least one collection marked as adding completed then throw
1025 while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
1029 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
1031 handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
1033 //Wait for any collection to become available.
1034 index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
1036 handles.RemoveAt(handles.Count - 1); //remove the linked token
1038 if (linkedTokenSource.IsCancellationRequested)
1040 if (externalCancellationToken.IsCancellationRequested) //case#3
1041 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
1043 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
1047 Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
1049 if (index == WaitHandle.WaitTimeout) //case#2
1050 return OPERATION_FAILED;
1052 //If the timeout period was not exhausted and the appropriate operation succeeded.
1053 if (collections[index].TryAdd(item)) //case#1
1056 // Update the timeout
1057 if (millisecondsTimeout != Timeout.Infinite)
1058 timeout = UpdateTimeOut(startTime, millisecondsTimeout);
1062 return OPERATION_FAILED;
1066 /// Fast path for TryAddToAny to find a non bounded collection and add the items in it
1068 /// <param name="collections">The collections list</param>
1069 /// <param name="item">The item to be added</param>
1070 /// <returns>The index which the item has been added, -1 if failed</returns>
1071 private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item)
1073 for (int i = 0; i < collections.Length; i++)
1075 if (collections[i].m_freeNodes == null)
1080 collections[i].TryAdd(item);
1082 Debug.Assert(result);
1090 /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections
1092 /// <param name="collections">The blocking collections</param>
1093 /// <param name="externalCancellationToken">The original CancellationToken</param>
1094 /// <param name="isAddOperation">True if Add or TryAdd, false if Take or TryTake</param>
1095 /// <param name="cancellationTokens">Complete list of cancellationTokens to observe</param>
1096 /// <returns>The collections wait handles</returns>
1097 private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
1100 Debug.Assert(collections != null);
1101 List<WaitHandle> handlesList = new List<WaitHandle>(collections.Length + 1); // + 1 for the external token handle to be added
1102 List<CancellationToken> tokensList = new List<CancellationToken>(collections.Length + 1); // + 1 for the external token
1103 tokensList.Add(externalCancellationToken);
1105 //Read the appropriate WaitHandle based on the operation mode.
1109 for (int i = 0; i < collections.Length; i++)
1111 if (collections[i].m_freeNodes != null)
1113 handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle);
1114 tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token);
1120 for (int i = 0; i < collections.Length; i++)
1122 if (collections[i].IsCompleted) //exclude Completed collections if it is take operation
1125 handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle);
1126 tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token);
1130 cancellationTokens = tokensList.ToArray();
1135 /// Helper function to measure and update the wait time
1137 /// <param name="startTime"> The first time (in milliseconds) observed when the wait started</param>
1138 /// <param name="originalWaitMillisecondsTimeout">The orginal wait timeoutout in milliseconds</param>
1139 /// <returns>The new wait time in milliseconds, -1 if the time expired</returns>
1140 private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
1142 if (originalWaitMillisecondsTimeout == 0)
1146 // The function must be called in case the time out is not infinite
1147 Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite);
1149 uint elapsedMilliseconds = (uint)Environment.TickCount - startTime;
1151 // Check the elapsed milliseconds is greater than max int because this property is uint
1152 if (elapsedMilliseconds > int.MaxValue)
1157 // Subtract the elapsed time from the current wait time
1158 int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ;
1159 if (currentWaitTimeout <= 0)
1164 return currentWaitTimeout;
1167 /// Takes an item from any one of the specified
1168 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1170 /// <param name="collections">The array of collections.</param>
1171 /// <param name="item">The item removed from one of the collections.</param>
1172 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1173 /// the item was removed, or -1 if an item could not be removed.</returns>
1174 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1175 /// null.</exception>
1176 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1177 /// a 0-length array or contains a null element.</exception>
1178 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1179 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1180 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1181 /// outside of its <see
1182 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1183 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1184 /// 62 for STA and 63 for MTA.</exception>
1185 /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
1186 public static int TakeFromAny(BlockingCollection<T>[] collections, out T item)
1188 return TakeFromAny(collections, out item, CancellationToken.None);
1192 /// Takes an item from any one of the specified
1193 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1194 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1197 /// <param name="collections">The array of collections.</param>
1198 /// <param name="item">The item removed from one of the collections.</param>
1199 /// <param name="cancellationToken">A cancellation token to observe.</param>
1200 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1201 /// the item was removed, or -1 if an item could not be removed.</returns>
1202 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1203 /// null.</exception>
1204 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1205 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1206 /// a 0-length array or contains a null element.</exception>
1207 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1208 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1209 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1210 /// outside of its <see
1211 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1212 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1213 /// 62 for STA and 63 for MTA.</exception>
1214 /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
1215 public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
1217 int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken);
1218 Debug.Assert((returnValue >= 0 && returnValue < collections.Length)
1219 , "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
1225 /// Attempts to remove an item from any one of the specified
1226 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1228 /// <param name="collections">The array of collections.</param>
1229 /// <param name="item">The item removed from one of the collections.</param>
1230 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1231 /// the item was removed, or -1 if an item could not be removed.</returns>
1232 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1233 /// null.</exception>
1234 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1235 /// a 0-length array or contains a null element.</exception>
1236 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1237 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1238 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1239 /// outside of its <see
1240 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1241 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1242 /// 62 for STA and 63 for MTA.</exception>
1243 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1244 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
1246 return TryTakeFromAny(collections, out item, 0);
1250 /// Attempts to remove an item from any one of the specified
1251 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1253 /// <param name="collections">The array of collections.</param>
1254 /// <param name="item">The item removed from one of the collections.</param>
1255 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
1256 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
1258 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1259 /// the item was removed, or -1 if an item could not be removed.</returns>
1260 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1261 /// null.</exception>
1262 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1263 /// a 0-length array or contains a null element.</exception>
1264 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1265 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1266 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
1267 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
1268 /// <see cref="System.Int32.MaxValue"/>.</exception>
1269 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1270 /// outside of its <see
1271 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1272 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1273 /// 62 for STA and 63 for MTA.</exception>
1274 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1275 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
1277 ValidateTimeout(timeout);
1278 return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None);
1282 /// Attempts to remove an item from any one of the specified
1283 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1285 /// <param name="collections">The array of collections.</param>
1286 /// <param name="item">The item removed from one of the collections.</param>
1287 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
1288 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
1289 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1290 /// the item was removed, or -1 if an item could not be removed.</returns>
1291 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1292 /// null.</exception>
1293 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1294 /// a 0-length array or contains a null element.</exception>
1295 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1296 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1297 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
1298 /// negative number other than -1, which represents an infinite time-out.</exception>
1299 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1300 /// outside of its <see
1301 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1302 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1303 /// 62 for STA and 63 for MTA.</exception>
1304 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1305 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
1307 ValidateMillisecondsTimeout(millisecondsTimeout);
1308 return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None);
1312 /// Attempts to remove an item from any one of the specified
1313 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1314 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1317 /// <param name="collections">The array of collections.</param>
1318 /// <param name="item">The item removed from one of the collections.</param>
1319 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
1320 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
1321 /// <param name="cancellationToken">A cancellation token to observe.</param>
1322 /// <returns>The index of the collection in the <paramref name="collections"/> array from which
1323 /// the item was removed, or -1 if an item could not be removed.</returns>
1324 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1325 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1326 /// null.</exception>
1327 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1328 /// a 0-length array or contains a null element.</exception>
1329 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1330 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1331 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
1332 /// negative number other than -1, which represents an infinite time-out.</exception>
1333 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1334 /// outside of its <see
1335 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1336 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1337 /// 62 for STA and 63 for MTA.</exception>
1338 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1339 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
1341 ValidateMillisecondsTimeout(millisecondsTimeout);
1342 return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken);
1345 /// <summary>Takes an item from anyone of the specified collections.
1346 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1349 /// <param name="collections">The collections from which the item can be removed.</param>
1350 /// <param name="item">The item removed and returned to the caller.</param>
1351 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
1352 /// operation, or -1 to wait indefinitely.</param>
1353 /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
1354 /// <param name="externalCancellationToken">A cancellation token to observe.</param>
1355 /// <returns>The index into collections for the collection which accepted the
1356 /// removal of the item; -1 if the item could not be removed.</returns>
1357 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1358 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1359 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
1360 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
1361 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
1362 private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
1365 ValidateCollectionsArray(collections, false);
1367 //try the fast path first
1368 for (int i = 0; i < collections.Length; i++)
1370 // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
1371 if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
1375 //Fast path failed, try the slow path
1376 return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);
1380 /// <summary>Takes an item from anyone of the specified collections.
1381 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1384 /// <param name="collections">The collections copy from which the item can be removed.</param>
1385 /// <param name="item">The item removed and returned to the caller.</param>
1386 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
1387 /// operation, or -1 to wait indefinitely.</param>
1388 /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
1389 /// <param name="externalCancellationToken">A cancellation token to observe.</param>
1390 /// <returns>The index into collections for the collection which accepted the
1391 /// removal of the item; -1 if the item could not be removed.</returns>
1392 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1393 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1394 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
1395 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
1396 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
1397 private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
1400 const int OPERATION_FAILED = -1;
1402 // Copy the wait time to another local variable to update it
1403 int timeout = millisecondsTimeout;
1406 if (millisecondsTimeout != Timeout.Infinite)
1408 startTime = (uint)Environment.TickCount;
1412 //Loop until one of these conditions is met:
1413 // 1- The operation is succeeded
1414 // 2- The timeout expired for try* versions
1415 // 3- The external token is cancelled, throw
1416 // 4- The operation is TryTake and all collections are marked as completed, return false
1417 // 5- The operation is Take and all collection are marked as completed, throw
1418 while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
1420 // Get wait handles and the tokens for all collections,
1421 // and construct a single combined token from all the tokens,
1422 // add the combined token handle to the handles list
1423 // call WaitAny for all handles
1424 // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
1425 // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
1426 // or one if the collection is Completed then exclude it and retry
1427 CancellationToken[] collatedCancellationTokens;
1428 List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens);
1430 if (handles.Count == 0 && isTakeOperation) //case#5
1431 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections");
1433 else if (handles.Count == 0) //case#4
1437 //Wait for any collection to become available.
1438 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
1440 handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
1441 int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
1443 if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3
1444 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
1447 else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested
1449 Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
1450 if (index == WaitHandle.WaitTimeout) //case#2
1453 // adjust the index in case one or more handles removed because they are completed
1454 if (collections.Length != handles.Count - 1) // -1 because of the combined token handle
1456 for (int i = 0; i < collections.Length; i++)
1458 if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index])
1466 if (collections[index].TryTake(out item)) //case#1
1471 // Update the timeout
1472 if (millisecondsTimeout != Timeout.Infinite)
1473 timeout = UpdateTimeOut(startTime, millisecondsTimeout);
1476 item = default(T); //case#2
1477 return OPERATION_FAILED;
1481 /// Marks the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances
1482 /// as not accepting any more additions.
1485 /// After a collection has been marked as complete for adding, adding to the collection is not permitted
1486 /// and attempts to remove from the collection will not wait when the collection is empty.
1488 /// <exception cref="T:System.ObjectDisposedException">The <see
1489 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1490 public void CompleteAdding()
1494 if (IsAddingCompleted)
1497 SpinWait spinner = new SpinWait();
1500 int observedAdders = m_currentAdders;
1501 if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
1504 // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes
1505 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
1509 if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
1512 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
1516 CancelWaitingConsumers();
1519 // We should always wake waiting producers, and have them throw exceptions as
1520 // Add&CompleteAdding should not be used concurrently.
1521 CancelWaitingProducers();
1529 /// <summary>Cancels the semaphores.</summary>
1530 private void CancelWaitingConsumers()
1532 m_ConsumersCancellationTokenSource.Cancel();
1535 private void CancelWaitingProducers()
1537 m_ProducersCancellationTokenSource.Cancel();
1542 /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
1544 public void Dispose()
1547 GC.SuppressFinalize(this);
1551 /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
1553 /// <param name="disposing">Whether being disposed explicitly (true) or due to a finalizer (false).</param>
1554 protected virtual void Dispose(bool disposing)
1558 if (m_freeNodes != null)
1560 m_freeNodes.Dispose();
1563 m_occupiedNodes.Dispose();
1565 m_isDisposed = true;
1569 /// <summary>Copies the items from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance into a new array.</summary>
1570 /// <returns>An array containing copies of the elements of the collection.</returns>
1571 /// <exception cref="T:System.ObjectDisposedException">The <see
1572 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1574 /// The copied elements are not removed from the collection.
1576 public T[] ToArray()
1579 return m_collection.ToArray();
1582 /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance
1583 /// to a compatible one-dimensional array, starting at the specified index of the target array.
1585 /// <param name="array">The one-dimensional array that is the destination of the elements copied from
1586 /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
1587 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
1588 /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
1589 /// null.</exception>
1590 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
1591 /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
1592 /// than the length of the <paramref name="array"/>.</exception>
1593 /// <exception cref="T:System.ObjectDisposedException">The <see
1594 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1595 public void CopyTo(T[] array, int index)
1597 ((ICollection)this).CopyTo(array, index);
1600 /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance
1601 /// to a compatible one-dimensional array, starting at the specified index of the target array.
1603 /// <param name="array">The one-dimensional array that is the destination of the elements copied from
1604 /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
1605 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
1606 /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
1607 /// null.</exception>
1608 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
1609 /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
1610 /// than the length of the <paramref name="array"/>, the array is multidimensional, or the type parameter for the collection
1611 /// cannot be cast automatically to the type of the destination array.</exception>
1612 /// <exception cref="T:System.ObjectDisposedException">The <see
1613 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1614 void ICollection.CopyTo(Array array, int index)
1618 //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize
1619 //all array exceptions.
1620 T[] collectionSnapShot = m_collection.ToArray();
1624 Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
1626 catch (ArgumentNullException)
1628 throw new ArgumentNullException("array");
1630 catch (ArgumentOutOfRangeException)
1632 throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative));
1634 catch (ArgumentException)
1636 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index");
1638 catch (RankException)
1640 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array");
1642 catch (InvalidCastException)
1644 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
1646 catch (ArrayTypeMismatchException)
1648 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
1652 /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.</summary>
1653 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
1654 /// <exception cref="T:System.ObjectDisposedException">The <see
1655 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1656 public IEnumerable<T> GetConsumingEnumerable()
1658 return GetConsumingEnumerable(CancellationToken.None);
1661 /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.
1662 /// Calling MoveNext on the returned enumerable will block if there is no data available, or will
1663 /// throw an <see cref="System.OperationCanceledException"/> if the <see cref="CancellationToken"/> is canceled.
1665 /// <param name="cancellationToken">A cancellation token to observe.</param>
1666 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
1667 /// <exception cref="T:System.ObjectDisposedException">The <see
1668 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1669 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1670 public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
1672 CancellationTokenSource linkedTokenSource = null;
1675 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
1676 while (!IsCompleted)
1679 if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource))
1687 if (linkedTokenSource != null)
1689 linkedTokenSource.Dispose();
1694 /// <summary>Provides an <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for items in the collection.</summary>
1695 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for the items in the collection.</returns>
1696 /// <exception cref="T:System.ObjectDisposedException">The <see
1697 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1698 IEnumerator<T> IEnumerable<T>.GetEnumerator()
1701 return m_collection.GetEnumerator();
1705 /// <summary>Provides an <see cref="T:System.Collections.IEnumerator"/> for items in the collection.</summary>
1706 /// <returns>An <see cref="T:System.Collections.IEnumerator"/> for the items in the collection.</returns>
1707 /// <exception cref="T:System.ObjectDisposedException">The <see
1708 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1709 IEnumerator IEnumerable.GetEnumerator()
1711 return ((IEnumerable<T>)this).GetEnumerator();
1714 /// <summary>Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
1715 /// and TryTakeFromAny().</summary>
1716 /// <param name="collections">The collections to/from which an item should be added/removed.</param>
1717 /// <param name="operationMode">Indicates whether this method is called to Add or Take.</param>
1718 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1719 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
1720 /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
1721 /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
1722 private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)
1724 if (collections == null)
1726 throw new ArgumentNullException("collections");
1728 else if (collections.Length < 1)
1730 throw new ArgumentException(
1731 SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections");
1733 else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62))
1734 //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken
1736 throw new ArgumentOutOfRangeException(
1737 "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize));
1740 for (int i = 0; i < collections.Length; ++i)
1742 if (collections[i] == null)
1744 throw new ArgumentException(
1745 SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections");
1748 if (collections[i].m_isDisposed)
1749 throw new ObjectDisposedException(
1750 "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems));
1752 if (isAddOperation && collections[i].IsAddingCompleted)
1754 throw new ArgumentException(
1755 SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
1760 private static bool IsSTAThread
1765 return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA;
1774 /// <summary>Centeralizes the logic of validating the timeout input argument.</summary>
1775 /// <param name="timeout">The TimeSpan to wait for to successfully complete an operation on the collection.</param>
1776 /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds represented by the timeout
1777 /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite</exception>
1778 private static void ValidateTimeout(TimeSpan timeout)
1780 long totalMilliseconds = (long)timeout.TotalMilliseconds;
1781 if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite))
1783 throw new ArgumentOutOfRangeException("timeout", timeout,
1784 String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
1788 /// <summary>Centralizes the logic of validating the millisecondsTimeout input argument.</summary>
1789 /// <param name="millisecondsTimeout">The number of milliseconds to wait for to successfully complete an
1790 /// operation on the collection.</param>
1791 /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds is less than 0 and not
1792 /// equal to Timeout.Infinite.</exception>
1793 private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
1795 if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
1797 throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
1798 String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
1802 /// <summary>Throws a System.ObjectDisposedException if the collection was disposed</summary>
1803 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
1804 private void CheckDisposed()
1808 throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed));
1816 /// <summary>A debugger view of the blocking collection that makes it simple to browse the
1817 /// collection's contents at a point in time.</summary>
1818 /// <typeparam name="T">The type of element that the BlockingCollection will hold.</typeparam>
1819 internal sealed class SystemThreadingCollections_BlockingCollectionDebugView<T>
1821 private BlockingCollection<T> m_blockingCollection; // The collection being viewed.
1823 /// <summary>Constructs a new debugger view object for the provided blocking collection object.</summary>
1824 /// <param name="collection">A blocking collection to browse in the debugger.</param>
1825 public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection<T> collection)
1827 if (collection == null)
1829 throw new ArgumentNullException("collection");
1832 m_blockingCollection = collection;
1835 /// <summary>Returns a snapshot of the underlying collection's elements.</summary>
1836 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
1841 return m_blockingCollection.ToArray();
1847 #pragma warning restore 0420