Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System / sys / system / collections / concurrent / BlockingCollection.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // BlockingCollection.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // A class that implements the bounding and blocking functionality while abstracting away
13 // the underlying storage mechanism. This file also contains BlockingCollection's 
14 // associated debugger view type.
15 //
16 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
17 #pragma warning disable 0420
18 using System;
19 using System.Collections.Generic;
20 using System.Collections;
21 using System.Diagnostics;
22 using System.Globalization;
23 using System.Security.Permissions;
24 using System.Runtime.InteropServices;
25 using System.Threading;
26
27 namespace System.Collections.Concurrent
28 {
29     /// <summary> 
30     /// Provides blocking and bounding capabilities for thread-safe collections that 
31     /// implement <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. 
32     /// </summary>
33     /// <remarks>
34     /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> represents a collection
35     /// that allows for thread-safe adding and removing of data. 
36     /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is used as a wrapper
37     /// for an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> instance, allowing
38     /// removal attempts from the collection to block until data is available to be removed.  Similarly,
39     /// a <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> can be created to enforce
40     /// an upper-bound on the number of data elements allowed in the 
41     /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>; addition attempts to the
42     /// collection may then block until space is available to store the added items.  In this manner,
43     /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is similar to a traditional
44     /// blocking queue data structure, except that the underlying data storage mechanism is abstracted
45     /// away as an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. 
46     /// </remarks>
47     /// <typeparam name="T">Specifies the type of elements in the collection.</typeparam>
48     [ComVisible(false)]
49 #if !FEATURE_NETCORE
50 #pragma warning disable 0618
51     [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
52 #pragma warning restore 0618
53 #endif
54     [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
55     [DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
56     public class BlockingCollection<T> : IEnumerable<T>, ICollection, IDisposable, IReadOnlyCollection<T>
57     {
58         private IProducerConsumerCollection<T> m_collection;
59         private int m_boundedCapacity;
60         private const int NON_BOUNDED = -1;
61         private SemaphoreSlim m_freeNodes;
62         private SemaphoreSlim m_occupiedNodes;
63         private bool m_isDisposed;
64         private CancellationTokenSource m_ConsumersCancellationTokenSource;
65         private CancellationTokenSource m_ProducersCancellationTokenSource;
66
67         private volatile int m_currentAdders;
68         private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000);
69
70         #region Properties
71         /// <summary>Gets the bounded capacity of this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</summary>
72         /// <value>The bounded capacity of this collection, or int.MaxValue if no bound was supplied.</value>
73         /// <exception cref="T:System.ObjectDisposedException">The <see
74         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
75         public int BoundedCapacity
76         {
77             get
78             {
79                 CheckDisposed();
80                 return m_boundedCapacity;
81             }
82         }
83
84         /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding.</summary>
85         /// <value>Whether this collection has been marked as complete for adding.</value>
86         /// <exception cref="T:System.ObjectDisposedException">The <see
87         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
88         public bool IsAddingCompleted
89         {
90             get
91             {
92                 CheckDisposed();
93                 return (m_currentAdders == COMPLETE_ADDING_ON_MASK);
94             }
95         }
96
97         /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding and is empty.</summary>
98         /// <value>Whether this collection has been marked as complete for adding and is empty.</value>
99         /// <exception cref="T:System.ObjectDisposedException">The <see
100         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
101         public bool IsCompleted
102         {
103             get
104             {
105                 CheckDisposed();
106                 return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
107             }
108         }
109
110         /// <summary>Gets the number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
111         /// <value>The number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</value>
112         /// <exception cref="T:System.ObjectDisposedException">The <see
113         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
114         public int Count
115         {
116             get
117             {
118                 CheckDisposed();
119                 return m_occupiedNodes.CurrentCount;
120             }
121         }
122
123         /// <summary>Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is synchronized.</summary>
124         /// <exception cref="T:System.ObjectDisposedException">The <see
125         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
126         bool ICollection.IsSynchronized
127         {
128             get
129             {
130                 CheckDisposed();
131                 return false;
132             }
133         }
134
135         /// <summary>
136         /// Gets an object that can be used to synchronize access to the <see
137         /// cref="T:System.Collections.ICollection"/>. This property is not supported.
138         /// </summary>
139         /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception>
140         object ICollection.SyncRoot
141         {
142             get
143             {
144                 throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported));
145             }
146         }
147         #endregion
148
149
150         /// <summary>Initializes a new instance of the 
151         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
152         /// class without an upper-bound.
153         /// </summary>
154         /// <remarks>
155         /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue&lt;T&gt;</see>.
156         /// </remarks>
157         public BlockingCollection()
158             : this(new ConcurrentQueue<T>())
159         {
160         }
161
162         /// <summary>Initializes a new instance of the <see
163         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
164         /// class with the specified upper-bound.
165         /// </summary>
166         /// <param name="boundedCapacity">The bounded size of the collection.</param>
167         /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is
168         /// not a positive value.</exception>
169         /// <remarks>
170         /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue&lt;T&gt;</see>.
171         /// </remarks>
172         public BlockingCollection(int boundedCapacity)
173             : this(new ConcurrentQueue<T>(), boundedCapacity)
174         {
175         }
176
177         /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
178         /// class with the specified upper-bound and using the provided 
179         /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
180         /// <param name="collection">The collection to use as the underlying data store.</param>
181         /// <param name="boundedCapacity">The bounded size of the collection.</param>
182         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
183         /// null.</exception>
184         /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is not a positive value.</exception>
185         /// <exception cref="System.ArgumentException">The supplied <paramref name="collection"/> contains more values 
186         /// than is permitted by <paramref name="boundedCapacity"/>.</exception>
187         public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
188         {
189             if (boundedCapacity < 1)
190             {
191                 throw new ArgumentOutOfRangeException(
192                     "boundedCapacity", boundedCapacity,
193                     SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
194             }
195             if (collection == null)
196             {
197                 throw new ArgumentNullException("collection");
198             }
199             int count = collection.Count;
200             if (count > boundedCapacity)
201             {
202                 throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
203             }
204             Initialize(collection, boundedCapacity, count);
205         }
206
207         /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
208         /// class without an upper-bound and using the provided 
209         /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
210         /// <param name="collection">The collection to use as the underlying data store.</param>
211         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
212         /// null.</exception>
213         public BlockingCollection(IProducerConsumerCollection<T> collection)
214         {
215             if (collection == null)
216             {
217                 throw new ArgumentNullException("collection");
218             }
219             Initialize(collection, NON_BOUNDED, collection.Count);
220         }
221
222         /// <summary>Initializes the BlockingCollection instance.</summary>
223         /// <param name="collection">The collection to use as the underlying data store.</param>
224         /// <param name="boundedCapacity">The bounded size of the collection.</param>
225         /// <param name="collectionCount">The number of items currently in the underlying collection.</param>
226         private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
227         {
228             Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
229
230             m_collection = collection;
231             m_boundedCapacity = boundedCapacity; ;
232             m_isDisposed = false;
233             m_ConsumersCancellationTokenSource = new CancellationTokenSource();
234             m_ProducersCancellationTokenSource = new CancellationTokenSource();
235
236             if (boundedCapacity == NON_BOUNDED)
237             {
238                 m_freeNodes = null;
239             }
240             else
241             {
242                 Debug.Assert(boundedCapacity > 0);
243                 m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
244             }
245
246
247             m_occupiedNodes = new SemaphoreSlim(collectionCount);
248         }
249
250
251         /// <summary>
252         /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
253         /// </summary>
254         /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
255         /// <exception cref="T:System.InvalidOperationException">The <see
256         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
257         /// as complete with regards to additions.</exception>
258         /// <exception cref="T:System.ObjectDisposedException">The <see
259         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
260         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
261         /// <remarks>
262         /// If a bounded capacity was specified when this instance of 
263         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized, 
264         /// a call to Add may block until space is available to store the provided item.
265         /// </remarks>
266         public void Add(T item)
267         {
268 #if DEBUG
269             bool tryAddReturnValue =
270 #endif
271             TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
272 #if DEBUG
273             Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
274 #endif
275         }
276
277         /// <summary>
278         /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
279         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
280         /// canceled.
281         /// </summary>
282         /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
283         /// <param name="cancellationToken">A cancellation token to observe.</param>
284         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
285         /// <exception cref="T:System.InvalidOperationException">The <see
286         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
287         /// as complete with regards to additions.</exception>
288         /// <exception cref="T:System.ObjectDisposedException">The <see
289         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
290         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
291         /// <remarks>
292         /// If a bounded capacity was specified when this instance of 
293         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized, 
294         /// a call to <see cref="Add(T,System.Threading.CancellationToken)"/> may block until space is available to store the provided item.
295         /// </remarks>
296         public void Add(T item, CancellationToken cancellationToken)
297         {
298 #if DEBUG
299             bool tryAddReturnValue =
300 #endif
301             TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
302 #if DEBUG
303             Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
304 #endif
305         }
306
307         /// <summary>
308         /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
309         /// </summary>
310         /// <param name="item">The item to be added to the collection.</param>
311         /// <returns>true if the <paramref name="item"/> could be added; otherwise, false.</returns>
312         /// <exception cref="T:System.InvalidOperationException">The <see
313         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
314         /// as complete with regards to additions.</exception>
315         /// <exception cref="T:System.ObjectDisposedException">The <see
316         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
317         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
318         public bool TryAdd(T item)
319         {
320             return TryAddWithNoTimeValidation(item, 0, new CancellationToken());
321         }
322
323         /// <summary>
324         /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
325         /// </summary>
326         /// <param name="item">The item to be added to the collection.</param>
327         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
328         /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
329         /// </param>
330         /// <returns>true if the <paramref name="item"/> could be added to the collection within 
331         /// the alloted time; otherwise, false.</returns>
332         /// <exception cref="T:System.InvalidOperationException">The <see
333         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
334         /// as complete with regards to additions.</exception>
335         /// <exception cref="T:System.ObjectDisposedException">The <see
336         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
337         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
338         /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
339         /// <see cref="System.Int32.MaxValue"/>.</exception>
340         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
341         public bool TryAdd(T item, TimeSpan timeout)
342         {
343             ValidateTimeout(timeout);
344             return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken());
345         }
346
347         /// <summary>
348         /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
349         /// </summary>
350         /// <param name="item">The item to be added to the collection.</param>
351         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
352         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
353         /// <returns>true if the <paramref name="item"/> could be added to the collection within 
354         /// the alloted time; otherwise, false.</returns>
355         /// <exception cref="T:System.InvalidOperationException">The <see
356         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
357         /// as complete with regards to additions.</exception>
358         /// <exception cref="T:System.ObjectDisposedException">The <see
359         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
360         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
361         /// negative number other than -1, which represents an infinite time-out.</exception>
362         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
363         public bool TryAdd(T item, int millisecondsTimeout)
364         {
365             ValidateMillisecondsTimeout(millisecondsTimeout);
366             return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken());
367         }
368
369         /// <summary>
370         /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
371         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
372         /// canceled.
373         /// </summary>
374         /// <param name="item">The item to be added to the collection.</param>
375         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
376         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
377         /// <param name="cancellationToken">A cancellation token to observe.</param>
378         /// <returns>true if the <paramref name="item"/> could be added to the collection within 
379         /// the alloted time; otherwise, false.</returns>
380         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
381         /// <exception cref="T:System.InvalidOperationException">The <see
382         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
383         /// as complete with regards to additions.</exception>
384         /// <exception cref="T:System.ObjectDisposedException">The <see
385         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
386         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
387         /// negative number other than -1, which represents an infinite time-out.</exception>
388         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
389         public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
390         {
391             ValidateMillisecondsTimeout(millisecondsTimeout);
392             return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
393         }
394
395         /// <summary>Adds an item into the underlying data store using its IProducerConsumerCollection&lt;T&gt;.Add 
396         /// method. If a bounded capacity was specified and the collection was full, 
397         /// this method will wait for, at most, the timeout period trying to add the item. 
398         /// If the timeout period was exhaused before successfully adding the item this method will 
399         /// return false.</summary>
400         /// <param name="item">The item to be added to the collection.</param>
401         /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to accept the item,
402         /// or Timeout.Infinite to wait indefinitely.</param>
403         /// <param name="cancellationToken">A cancellation token to observe.</param>
404         /// <returns>False if the collection remained full till the timeout period was exhausted.True otherwise.</returns>
405         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
406         /// <exception cref="System.InvalidOperationException">the collection has already been marked
407         /// as complete with regards to additions.</exception>
408         /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
409         /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
410         private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
411         {
412             CheckDisposed();
413
414             if (cancellationToken.IsCancellationRequested)
415                 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
416
417             if (IsAddingCompleted)
418             {
419                 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
420             }
421
422             bool waitForSemaphoreWasSuccessful = true;
423
424             if (m_freeNodes != null)
425             {
426                 //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
427                 //was called concurrently with Adding which is not supported by BlockingCollection.
428                 CancellationTokenSource linkedTokenSource = null;
429                 try
430                 {
431                     waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
432                     if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
433                     {
434                         linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
435                             cancellationToken, m_ProducersCancellationTokenSource.Token);
436                         waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
437                     }
438                 }
439                 catch (OperationCanceledException)
440                 {
441                     //if cancellation was via external token, throw an OCE
442                     if (cancellationToken.IsCancellationRequested)
443                         throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
444
445                     //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
446                     //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
447
448                     throw new InvalidOperationException
449                         (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
450                 }
451                 finally
452                 {
453                     if (linkedTokenSource != null)
454                     {
455                         linkedTokenSource.Dispose();
456                     }
457                 }
458             }
459             if (waitForSemaphoreWasSuccessful)
460             {
461                 // Update the adders count if the complete adding was not requested, otherwise
462                 // spins until all adders finish then throw IOE
463                 // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
464                 // not been finished yet
465                 SpinWait spinner = new SpinWait();
466                 while (true)
467                 {
468                     int observedAdders = m_currentAdders;
469                     if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
470                     {
471                         spinner.Reset();
472                         // CompleteAdding is requested, spin then throw
473                         while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
474                         throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
475                     }
476                     if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
477                     {
478                         Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
479                         break;
480                     }
481                     spinner.SpinOnce();
482                 }
483
484                 // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
485                 // 1- m_collection.TryAdd threw an exception
486                 // 2- m_collection.TryAdd succeeded
487                 // 3- m_collection.TryAdd returned false
488                 // so we put the decrement code in the finally block
489                 try
490                 {
491
492                     //TryAdd is guaranteed to find a place to add the element. Its return value depends
493                     //on the semantics of the underlying store. Some underlying stores will not add an already
494                     //existing item and thus TryAdd returns false indicating that the size of the underlying
495                     //store did not increase.
496
497
498                     bool addingSucceeded = false;
499                     try
500                     {
501                         //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
502                         //This fixes bug #702328, case 2 of 2.
503                         cancellationToken.ThrowIfCancellationRequested();
504                         addingSucceeded = m_collection.TryAdd(item);
505                     }
506                     catch
507                     {
508                         //TryAdd did not result in increasing the size of the underlying store and hence we need
509                         //to increment back the count of the m_freeNodes semaphore.
510                         if (m_freeNodes != null)
511                         {
512                             m_freeNodes.Release();
513                         }
514                         throw;
515                     }
516                     if (addingSucceeded)
517                     {
518                         //After adding an element to the underlying storage, signal to the consumers 
519                         //waiting on m_occupiedNodes that there is a new item added ready to be consumed.
520                         m_occupiedNodes.Release();
521                     }
522                     else
523                     {
524                         throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
525                     }
526                 }
527                 finally
528                 {
529                     // decrement the adders count
530                     Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
531                     Interlocked.Decrement(ref m_currentAdders);
532                 }
533
534
535             }
536             return waitForSemaphoreWasSuccessful;
537         }
538
539         /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
540         /// <returns>The item removed from the collection.</returns>
541         /// <exception cref="T:System.OperationCanceledException">The <see
542         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
543         /// as complete with regards to additions.</exception>
544         /// <exception cref="T:System.ObjectDisposedException">The <see
545         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
546         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
547         /// outside of this <see
548         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
549         /// <remarks>A call to <see cref="Take()"/> may block until an item is available to be removed.</remarks>
550         public T Take()
551         {
552             T item;
553
554             if (!TryTake(out item, Timeout.Infinite, CancellationToken.None))
555             {
556                 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
557             }
558
559             return item;
560         }
561
562         /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
563         /// <returns>The item removed from the collection.</returns>
564         /// <exception cref="T:System.OperationCanceledException">If the <see cref="CancellationToken"/> is
565         /// canceled or the <see
566         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
567         /// as complete with regards to additions.</exception>
568         /// <exception cref="T:System.ObjectDisposedException">The <see
569         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
570         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
571         /// outside of this <see
572         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
573         /// <remarks>A call to <see cref="Take(CancellationToken)"/> may block until an item is available to be removed.</remarks>
574         public T Take(CancellationToken cancellationToken)
575         {
576             T item;
577
578             if (!TryTake(out item, Timeout.Infinite, cancellationToken))
579             {
580                 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
581             }
582
583             return item;
584         }
585
586         /// <summary>
587         /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
588         /// </summary>
589         /// <param name="item">The item removed from the collection.</param>
590         /// <returns>true if an item could be removed; otherwise, false.</returns>
591         /// <exception cref="T:System.ObjectDisposedException">The <see
592         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
593         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
594         /// outside of this <see
595         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
596         public bool TryTake(out T item)
597         {
598             return TryTake(out item, 0, CancellationToken.None);
599         }
600
601         /// <summary>
602         /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
603         /// </summary>
604         /// <param name="item">The item removed from the collection.</param>
605         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
606         /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
607         /// </param>
608         /// <returns>true if an item could be removed from the collection within 
609         /// the alloted time; otherwise, false.</returns>
610         /// <exception cref="T:System.ObjectDisposedException">The <see
611         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
612         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
613         /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
614         /// <see cref="System.Int32.MaxValue"/>.</exception>
615         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
616         /// outside of this <see
617         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
618         public bool TryTake(out T item, TimeSpan timeout)
619         {
620             ValidateTimeout(timeout);
621             return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
622         }
623
624         /// <summary>
625         /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
626         /// </summary>
627         /// <param name="item">The item removed from the collection.</param>
628         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
629         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
630         /// <returns>true if an item could be removed from the collection within 
631         /// the alloted time; otherwise, false.</returns>
632         /// <exception cref="T:System.ObjectDisposedException">The <see
633         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
634         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
635         /// negative number other than -1, which represents an infinite time-out.</exception>
636         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
637         /// outside of this <see
638         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
639         public bool TryTake(out T item, int millisecondsTimeout)
640         {
641             ValidateMillisecondsTimeout(millisecondsTimeout);
642             return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
643         }
644
645         /// <summary>
646         /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
647         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
648         /// canceled.
649         /// </summary>
650         /// <param name="item">The item removed from the collection.</param>
651         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
652         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
653         /// <param name="cancellationToken">A cancellation token to observe.</param>
654         /// <returns>true if an item could be removed from the collection within 
655         /// the alloted time; otherwise, false.</returns>
656         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
657         /// <exception cref="T:System.ObjectDisposedException">The <see
658         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
659         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
660         /// negative number other than -1, which represents an infinite time-out.</exception>
661         /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
662         /// outside of this <see
663         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
664         public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
665         {
666             ValidateMillisecondsTimeout(millisecondsTimeout);
667             return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
668         }
669
670         /// <summary>Takes an item from the underlying data store using its IProducerConsumerCollection&lt;T&gt;.Take 
671         /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false)
672         /// trying to remove an item. If the timeout period was exhaused before successfully removing an item 
673         /// this method will return false.
674         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
675         /// canceled.
676         /// </summary>
677         /// <param name="item">The item removed from the collection.</param>
678         /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to have an item available 
679         /// for removal, or Timeout.Infinite to wait indefinitely.</param>
680         /// <param name="cancellationToken">A cancellation token to observe.</param>
681         /// <param name="combinedTokenSource">A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token 
682         /// multiple times.</param>
683         /// <returns>False if the collection remained empty till the timeout period was exhausted. True otherwise.</returns>
684         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
685         /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
686         private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)
687         {
688             CheckDisposed();
689             item = default(T);
690
691             if (cancellationToken.IsCancellationRequested)
692                 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
693
694             //If the collection is completed then there is no need to wait.
695             if (IsCompleted)
696             {
697                 return false;
698             }
699             bool waitForSemaphoreWasSuccessful = false;
700
701             // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable)
702             CancellationTokenSource linkedTokenSource = combinedTokenSource;
703             try
704             {
705                 waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0);
706                 if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
707                 {
708                     // create the linked token if it is not created yet
709                     if (combinedTokenSource == null)
710                         linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
711                                                                                           m_ConsumersCancellationTokenSource.Token);
712                     waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
713                 }
714             }
715             //The collection became completed while waiting on the semaphore.
716             catch (OperationCanceledException)
717             {
718                 if (cancellationToken.IsCancellationRequested)
719                     throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
720
721                 return false;
722             }
723             finally
724             {
725                 // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it
726                 if (linkedTokenSource != null && combinedTokenSource == null)
727                 {
728                     linkedTokenSource.Dispose();
729                 }
730             }
731
732             if (waitForSemaphoreWasSuccessful)
733             {
734                 bool removeSucceeded = false;
735                 bool removeFaulted = true;
736                 try
737                 {
738                     //The token may have been canceled before an item arrived, so we need a check after the wait has completed.
739                     //This fixes bug #702328, case 1 of 2.
740                     cancellationToken.ThrowIfCancellationRequested();
741
742                     //If an item was successfully removed from the underlying collection.
743                     removeSucceeded = m_collection.TryTake(out item);
744                     removeFaulted = false;
745                     if (!removeSucceeded)
746                     {
747                         // Check if the collection is empty which means that the collection was modified outside BlockingCollection
748                         throw new InvalidOperationException
749                             (SR.GetString(SR.BlockingCollection_Take_CollectionModified));
750                     }
751                 }
752                 finally
753                 {
754                     // removeFaulted implies !removeSucceeded, but the reverse is not true.
755                     if (removeSucceeded)
756                     {
757                         if (m_freeNodes != null)
758                         {
759                             Debug.Assert(m_boundedCapacity != NON_BOUNDED);
760                             m_freeNodes.Release();
761                         }
762                     }
763                     else if (removeFaulted)
764                     {
765                         m_occupiedNodes.Release();
766                     }
767                     //Last remover will detect that it has actually removed the last item from the 
768                     //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores
769                     //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
770                     //but this is not a problem.
771                     if (IsCompleted)
772                     {
773                         CancelWaitingConsumers();
774                     }
775                 }
776             }
777             return waitForSemaphoreWasSuccessful;
778         }
779
780
781
782         /// <summary>
783         /// Adds the specified item to any one of the specified
784         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
785         /// </summary>
786         /// <param name="collections">The array of collections.</param>
787         /// <param name="item">The item to be added to one of the collections.</param>
788         /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
789         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
790         /// null.</exception>
791         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
792         /// a 0-length array or contains a null element, or at least one of collections has been
793         /// marked as complete for adding.</exception>
794         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
795         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
796         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
797         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
798         /// 62 for STA and 63 for MTA.</exception>
799         /// <remarks>
800         /// If a bounded capacity was specified when all of the
801         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized, 
802         /// a call to AddToAny may block until space is available in one of the collections
803         /// to store the provided item.
804         /// </remarks>
805         public static int AddToAny(BlockingCollection<T>[] collections, T item)
806         {
807 #if DEBUG
808             int tryAddAnyReturnValue =
809 #else
810             return
811 #endif
812  TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None);
813 #if DEBUG
814             Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
815                                 , "TryAddToAny() was expected to return an index within the bounds of the collections array.");
816             return tryAddAnyReturnValue;
817 #endif
818         }
819
820         /// <summary>
821         /// Adds the specified item to any one of the specified
822         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
823         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
824         /// canceled. 
825         /// </summary>
826         /// <param name="collections">The array of collections.</param>
827         /// <param name="item">The item to be added to one of the collections.</param>
828         /// <param name="cancellationToken">A cancellation token to observe.</param>
829         /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
830         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
831         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
832         /// null.</exception>
833         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
834         /// a 0-length array or contains a null element, or at least one of collections has been
835         /// marked as complete for adding.</exception>
836         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
837         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
838         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
839         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
840         /// 62 for STA and 63 for MTA.</exception>
841         /// <remarks>
842         /// If a bounded capacity was specified when all of the
843         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized, 
844         /// a call to AddToAny may block until space is available in one of the collections
845         /// to store the provided item.
846         /// </remarks>
847         public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
848         {
849 #if DEBUG
850             int tryAddAnyReturnValue =
851 #else
852             return
853 #endif
854  TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
855 #if DEBUG
856             Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
857                                 , "TryAddToAny() was expected to return an index within the bounds of the collections array.");
858             return tryAddAnyReturnValue;
859 #endif
860         }
861
862         /// <summary>
863         /// Attempts to add the specified item to any one of the specified
864         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
865         /// </summary>
866         /// <param name="collections">The array of collections.</param>
867         /// <param name="item">The item to be added to one of the collections.</param>
868         /// <returns>The index of the collection in the <paramref name="collections"/> 
869         /// array to which the item was added, or -1 if the item could not be added.</returns>
870         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
871         /// null.</exception>
872         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
873         /// a 0-length array or contains a null element, or at least one of collections has been
874         /// marked as complete for adding.</exception>
875         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
876         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
877         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
878         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
879         /// 62 for STA and 63 for MTA.</exception>
880         public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
881         {
882             return TryAddToAny(collections, item, 0, CancellationToken.None);
883         }
884
885         /// <summary>
886         /// Attempts to add the specified item to any one of the specified
887         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
888         /// </summary>
889         /// <param name="collections">The array of collections.</param>
890         /// <param name="item">The item to be added to one of the collections.</param>
891         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
892         /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
893         /// </param>
894         /// <returns>The index of the collection in the <paramref name="collections"/> 
895         /// array to which the item was added, or -1 if the item could not be added.</returns>
896         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
897         /// null.</exception>
898         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
899         /// a 0-length array or contains a null element, or at least one of collections has been
900         /// marked as complete for adding.</exception>
901         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
902         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
903         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
904         /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
905         /// <see cref="System.Int32.MaxValue"/>.</exception>
906         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
907         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
908         /// 62 for STA and 63 for MTA.</exception>
909         public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)
910         {
911             ValidateTimeout(timeout);
912             return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
913         }
914
915         /// <summary>
916         /// Attempts to add the specified item to any one of the specified
917         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
918         /// </summary>
919         /// <param name="collections">The array of collections.</param>
920         /// <param name="item">The item to be added to one of the collections.</param>
921         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
922         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>        /// <returns>The index of the collection in the <paramref name="collections"/> 
923         /// array to which the item was added, or -1 if the item could not be added.</returns>
924         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
925         /// null.</exception>
926         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
927         /// a 0-length array or contains a null element, or at least one of collections has been
928         /// marked as complete for adding.</exception>
929         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
930         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
931         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
932         /// negative number other than -1, which represents an infinite time-out.</exception>
933         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
934         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
935         /// 62 for STA and 63 for MTA.</exception>
936         public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
937         {
938             ValidateMillisecondsTimeout(millisecondsTimeout);
939             return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
940         }
941
942         /// <summary>
943         /// Attempts to add the specified item to any one of the specified
944         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
945         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
946         /// canceled.
947         /// </summary>
948         /// <param name="collections">The array of collections.</param>
949         /// <param name="item">The item to be added to one of the collections.</param>
950         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
951         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>        
952         /// <returns>The index of the collection in the <paramref name="collections"/> 
953         /// array to which the item was added, or -1 if the item could not be added.</returns>
954         /// <param name="cancellationToken">A cancellation token to observe.</param>
955         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
956         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
957         /// null.</exception>
958         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
959         /// a 0-length array or contains a null element, or at least one of collections has been
960         /// marked as complete for adding.</exception>
961         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
962         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
963         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
964         /// negative number other than -1, which represents an infinite time-out.</exception>
965         /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
966         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
967         /// 62 for STA and 63 for MTA.</exception>
968         public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
969         {
970             ValidateMillisecondsTimeout(millisecondsTimeout);
971             return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
972         }
973
974         /// <summary>Adds an item to anyone of the specified collections.
975         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
976         /// canceled. 
977         /// </summary>
978         /// <param name="collections">The collections into which the item can be added.</param>
979         /// <param name="item">The item to be added .</param>
980         /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 
981         /// operation, or -1 to wait indefinitely.</param>
982         /// <param name="externalCancellationToken">A cancellation token to observe.</param>
983         /// <returns>The index into collections for the collection which accepted the 
984         /// adding of the item; -1 if the item could not be added.</returns>
985         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
986         /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
987         /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 
988         /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
989         /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
990         private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
991         {
992             ValidateCollectionsArray(collections, true);
993             const int OPERATION_FAILED = -1;
994
995             // Copy the wait time to another local variable to update it
996             int timeout = millisecondsTimeout;
997
998             uint startTime = 0;
999             if (millisecondsTimeout != Timeout.Infinite)
1000             {
1001                 startTime = (uint)Environment.TickCount;
1002             }
1003
1004             // Fast path for adding if there is at least one unbounded collection
1005             int index = TryAddToAnyFast(collections, item);
1006             if (index > -1)
1007                 return index;
1008
1009
1010             // Get wait handles and the tokens for all collections,
1011             // and construct a single combined token from all the tokens,
1012             // add the combined token handle to the handles list
1013             // call WaitAny for all handles
1014             // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
1015             // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
1016             // or one if the collection is AddingCompleted then throw AE
1017             CancellationToken[] collatedCancellationTokens;
1018             List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens);
1019
1020             //Loop until one of these conditions is met:
1021             // 1- The operation is succeeded
1022             // 2- The timeout expired for try* versions
1023             // 3- The external token is cancelled, throw
1024             // 4- There is at least one collection marked as adding completed then throw
1025             while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
1026             {
1027                 index = -1;
1028               
1029                 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
1030                 {
1031                     handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
1032
1033                     //Wait for any collection to become available.
1034                     index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
1035
1036                     handles.RemoveAt(handles.Count - 1); //remove the linked token
1037
1038                     if (linkedTokenSource.IsCancellationRequested)
1039                     {
1040                         if (externalCancellationToken.IsCancellationRequested) //case#3
1041                             throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
1042                         else //case#4
1043                             throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
1044                     }
1045                 }
1046
1047                 Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
1048
1049                 if (index == WaitHandle.WaitTimeout) //case#2
1050                     return OPERATION_FAILED;
1051
1052                 //If the timeout period was not exhausted and the appropriate operation succeeded.
1053                 if (collections[index].TryAdd(item)) //case#1
1054                     return index;
1055
1056                 // Update the timeout
1057                 if (millisecondsTimeout != Timeout.Infinite)
1058                     timeout = UpdateTimeOut(startTime, millisecondsTimeout);
1059             }
1060
1061             // case #2
1062             return OPERATION_FAILED;
1063         }
1064
1065         /// <summary>
1066         /// Fast path for TryAddToAny to find a non bounded collection and add the items in it
1067         /// </summary>
1068         /// <param name="collections">The collections list</param>
1069         /// <param name="item">The item to be added</param>
1070         /// <returns>The index which the item has been added, -1 if failed</returns>
1071         private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item)
1072         {
1073             for (int i = 0; i < collections.Length; i++)
1074             {
1075                 if (collections[i].m_freeNodes == null)
1076                 {
1077 #if DEBUG
1078                         bool result = 
1079 #endif
1080                     collections[i].TryAdd(item);
1081 #if DEBUG
1082                         Debug.Assert(result);
1083 #endif
1084                     return i;
1085                 }
1086             }
1087             return -1;
1088         }
1089         /// <summary>
1090         /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections
1091         /// </summary>
1092         /// <param name="collections">The blocking collections</param>
1093         /// <param name="externalCancellationToken">The original CancellationToken</param>
1094         /// <param name="isAddOperation">True if Add or TryAdd, false if Take or TryTake</param>
1095         /// <param name="cancellationTokens">Complete list of cancellationTokens to observe</param>
1096         /// <returns>The collections wait handles</returns>
1097         private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
1098         {
1099
1100             Debug.Assert(collections != null);
1101             List<WaitHandle> handlesList = new List<WaitHandle>(collections.Length + 1); // + 1 for the external token handle to be added
1102             List<CancellationToken> tokensList = new List<CancellationToken>(collections.Length + 1); // + 1 for the external token
1103             tokensList.Add(externalCancellationToken);
1104
1105             //Read the appropriate WaitHandle based on the operation mode.
1106             if (isAddOperation)
1107             {
1108
1109                 for (int i = 0; i < collections.Length; i++)
1110                 {
1111                     if (collections[i].m_freeNodes != null)
1112                     {
1113                         handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle);
1114                         tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token);
1115                     }
1116                 }
1117             }
1118             else
1119             {
1120                 for (int i = 0; i < collections.Length; i++)
1121                 {
1122                     if (collections[i].IsCompleted) //exclude Completed collections if it is take operation
1123                         continue;
1124                    
1125                     handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle);
1126                     tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token);
1127                 }
1128             }
1129
1130             cancellationTokens = tokensList.ToArray();
1131             return handlesList;
1132         }
1133
1134         /// <summary>
1135         /// Helper function to measure and update the wait time
1136         /// </summary>
1137         /// <param name="startTime"> The first time (in milliseconds) observed when the wait started</param>
1138         /// <param name="originalWaitMillisecondsTimeout">The orginal wait timeoutout in milliseconds</param>
1139         /// <returns>The new wait time in milliseconds, -1 if the time expired</returns>
1140         private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
1141         {
1142             if (originalWaitMillisecondsTimeout == 0)
1143             {
1144                 return 0;
1145             }
1146             // The function must be called in case the time out is not infinite
1147             Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite);
1148
1149             uint elapsedMilliseconds = (uint)Environment.TickCount - startTime;
1150
1151             // Check the elapsed milliseconds is greater than max int because this property is uint
1152             if (elapsedMilliseconds > int.MaxValue)
1153             {
1154                 return 0;
1155             }
1156
1157             // Subtract the elapsed time from the current wait time
1158             int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ;
1159             if (currentWaitTimeout <= 0)
1160             {
1161                 return 0;
1162             }
1163
1164             return currentWaitTimeout;
1165         }
1166         /// <summary>
1167         /// Takes an item from any one of the specified
1168         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1169         /// </summary>
1170         /// <param name="collections">The array of collections.</param>
1171         /// <param name="item">The item removed from one of the collections.</param>
1172         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1173         /// the item was removed, or -1 if an item could not be removed.</returns>
1174         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1175         /// null.</exception>
1176         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1177         /// a 0-length array or contains a null element.</exception>
1178         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1179         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1180         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1181         /// outside of its <see
1182         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1183         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1184         /// 62 for STA and 63 for MTA.</exception>
1185         /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
1186         public static int TakeFromAny(BlockingCollection<T>[] collections, out T item)
1187         {
1188             return TakeFromAny(collections, out item, CancellationToken.None);
1189         }
1190
1191         /// <summary>
1192         /// Takes an item from any one of the specified
1193         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1194         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1195         /// canceled.
1196         /// </summary>
1197         /// <param name="collections">The array of collections.</param>
1198         /// <param name="item">The item removed from one of the collections.</param>
1199         /// <param name="cancellationToken">A cancellation token to observe.</param>
1200         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1201         /// the item was removed, or -1 if an item could not be removed.</returns>
1202         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1203         /// null.</exception>
1204         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1205         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1206         /// a 0-length array or contains a null element.</exception>
1207         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1208         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1209         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1210         /// outside of its <see
1211         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1212         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 
1213         /// 62 for STA and 63 for MTA.</exception>
1214         /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
1215         public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
1216         {
1217             int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken);
1218             Debug.Assert((returnValue >= 0 && returnValue < collections.Length)
1219                                           , "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
1220             return returnValue;
1221
1222         }
1223
1224         /// <summary>
1225         /// Attempts to remove an item from any one of the specified
1226         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1227         /// </summary>
1228         /// <param name="collections">The array of collections.</param>
1229         /// <param name="item">The item removed from one of the collections.</param>
1230         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1231         /// the item was removed, or -1 if an item could not be removed.</returns>
1232         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1233         /// null.</exception>
1234         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1235         /// a 0-length array or contains a null element.</exception>
1236         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1237         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1238         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1239         /// outside of its <see
1240         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1241         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1242         /// 62 for STA and 63 for MTA.</exception>
1243         /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1244         public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
1245         {
1246             return TryTakeFromAny(collections, out item, 0);
1247         }
1248
1249         /// <summary>
1250         /// Attempts to remove an item from any one of the specified
1251         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1252         /// </summary>
1253         /// <param name="collections">The array of collections.</param>
1254         /// <param name="item">The item removed from one of the collections.</param>
1255         /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
1256         /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
1257         /// </param>
1258         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1259         /// the item was removed, or -1 if an item could not be removed.</returns>
1260         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1261         /// null.</exception>
1262         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1263         /// a 0-length array or contains a null element.</exception>
1264         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1265         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1266         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
1267         /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
1268         /// <see cref="System.Int32.MaxValue"/>.</exception>
1269         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1270         /// outside of its <see
1271         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1272         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1273         /// 62 for STA and 63 for MTA.</exception>
1274         /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1275         public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
1276         {
1277             ValidateTimeout(timeout);
1278             return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None);
1279         }
1280
1281         /// <summary>
1282         /// Attempts to remove an item from any one of the specified
1283         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1284         /// </summary>
1285         /// <param name="collections">The array of collections.</param>
1286         /// <param name="item">The item removed from one of the collections.</param>
1287         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
1288         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
1289         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1290         /// the item was removed, or -1 if an item could not be removed.</returns>
1291         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1292         /// null.</exception>
1293         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1294         /// a 0-length array or contains a null element.</exception>
1295         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1296         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1297         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
1298         /// negative number other than -1, which represents an infinite time-out.</exception>
1299         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1300         /// outside of its <see
1301         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1302         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1303         /// 62 for STA and 63 for MTA.</exception>
1304         /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1305         public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
1306         {
1307             ValidateMillisecondsTimeout(millisecondsTimeout);
1308             return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None);
1309         }
1310
1311         /// <summary>
1312         /// Attempts to remove an item from any one of the specified
1313         /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
1314         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1315         /// canceled. 
1316         /// </summary>
1317         /// <param name="collections">The array of collections.</param>
1318         /// <param name="item">The item removed from one of the collections.</param>
1319         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
1320         /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
1321         /// <param name="cancellationToken">A cancellation token to observe.</param>
1322         /// <returns>The index of the collection in the <paramref name="collections"/> array from which 
1323         /// the item was removed, or -1 if an item could not be removed.</returns>
1324         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1325         /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
1326         /// null.</exception>
1327         /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
1328         /// a 0-length array or contains a null element.</exception>
1329         /// <exception cref="T:System.ObjectDisposedException">At least one of the <see
1330         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
1331         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
1332         /// negative number other than -1, which represents an infinite time-out.</exception>
1333         /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
1334         /// outside of its <see
1335         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
1336         /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
1337         /// 62 for STA and 63 for MTA.</exception>
1338         /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
1339         public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
1340         {
1341             ValidateMillisecondsTimeout(millisecondsTimeout);
1342             return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken);
1343         }
1344
1345         /// <summary>Takes an item from anyone of the specified collections.
1346         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1347         /// canceled. 
1348         /// </summary>
1349         /// <param name="collections">The collections from which the item can be removed.</param>
1350         /// <param name="item">The item removed and returned to the caller.</param>
1351         /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 
1352         /// operation, or -1 to wait indefinitely.</param>
1353         /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
1354         /// <param name="externalCancellationToken">A cancellation token to observe.</param>
1355         /// <returns>The index into collections for the collection which accepted the 
1356         /// removal of the item; -1 if the item could not be removed.</returns>
1357         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1358         /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1359         /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 
1360         /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
1361         /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
1362         private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
1363         {
1364
1365             ValidateCollectionsArray(collections, false);
1366
1367              //try the fast path first
1368             for (int i = 0; i < collections.Length; i++)
1369             {
1370                 // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
1371                 if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
1372                     return i;
1373             }
1374
1375             //Fast path failed, try the slow path
1376             return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);  
1377         }
1378
1379
1380         /// <summary>Takes an item from anyone of the specified collections.
1381         /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
1382         /// canceled. 
1383         /// </summary>
1384         /// <param name="collections">The collections copy from which the item can be removed.</param>
1385         /// <param name="item">The item removed and returned to the caller.</param>
1386         /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 
1387         /// operation, or -1 to wait indefinitely.</param>
1388         /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
1389         /// <param name="externalCancellationToken">A cancellation token to observe.</param>
1390         /// <returns>The index into collections for the collection which accepted the 
1391         /// removal of the item; -1 if the item could not be removed.</returns>
1392         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1393         /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1394         /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 
1395         /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
1396         /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
1397         private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
1398         {
1399
1400             const int OPERATION_FAILED = -1;
1401
1402             // Copy the wait time to another local variable to update it
1403             int timeout = millisecondsTimeout;
1404
1405             uint startTime = 0;
1406             if (millisecondsTimeout != Timeout.Infinite)
1407             {
1408                 startTime = (uint)Environment.TickCount;
1409             }
1410
1411
1412             //Loop until one of these conditions is met:
1413             // 1- The operation is succeeded
1414             // 2- The timeout expired for try* versions
1415             // 3- The external token is cancelled, throw
1416             // 4- The operation is TryTake and all collections are marked as completed, return false
1417             // 5- The operation is Take and all collection are marked as completed, throw
1418             while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
1419             {
1420                 // Get wait handles and the tokens for all collections,
1421                 // and construct a single combined token from all the tokens,
1422                 // add the combined token handle to the handles list
1423                 // call WaitAny for all handles
1424                 // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
1425                 // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
1426                 // or one if the collection is Completed then exclude it and retry
1427                 CancellationToken[] collatedCancellationTokens;
1428                 List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens);
1429
1430                 if (handles.Count == 0 && isTakeOperation) //case#5
1431                     throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections");
1432
1433                 else if (handles.Count == 0) //case#4
1434                     break;
1435
1436
1437                 //Wait for any collection to become available.
1438                 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
1439                 {
1440                     handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
1441                     int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
1442
1443                     if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3
1444                         throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
1445
1446
1447                     else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested
1448                     {
1449                         Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
1450                         if (index == WaitHandle.WaitTimeout) //case#2
1451                             break;
1452
1453                         // adjust the index in case one or more handles removed because they are completed
1454                         if (collections.Length != handles.Count - 1) // -1 because of the combined token handle
1455                         {
1456                             for (int i = 0; i < collections.Length; i++)
1457                             {
1458                                 if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index])
1459                                 {
1460                                     index = i;
1461                                     break;
1462                                 }
1463                             }
1464                         }
1465
1466                         if (collections[index].TryTake(out item)) //case#1
1467                             return index;
1468                     }
1469                 }
1470
1471                 // Update the timeout
1472                 if (millisecondsTimeout != Timeout.Infinite)
1473                     timeout = UpdateTimeOut(startTime, millisecondsTimeout);
1474             }
1475
1476             item = default(T); //case#2
1477             return OPERATION_FAILED;
1478         }
1479
1480         /// <summary>
1481         /// Marks the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances
1482         /// as not accepting any more additions.  
1483         /// </summary>
1484         /// <remarks>
1485         /// After a collection has been marked as complete for adding, adding to the collection is not permitted 
1486         /// and attempts to remove from the collection will not wait when the collection is empty.
1487         /// </remarks>
1488         /// <exception cref="T:System.ObjectDisposedException">The <see
1489         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1490         public void CompleteAdding()
1491         {
1492             CheckDisposed();
1493
1494             if (IsAddingCompleted)
1495                 return;
1496
1497             SpinWait spinner = new SpinWait();
1498             while (true)
1499             {
1500                 int observedAdders = m_currentAdders;
1501                 if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
1502                 {
1503                     spinner.Reset();
1504                     // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes
1505                     while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
1506                     return;
1507                 }
1508
1509                 if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
1510                 {
1511                     spinner.Reset();
1512                     while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
1513
1514                     if (Count == 0)
1515                     {
1516                         CancelWaitingConsumers();
1517                     }
1518
1519                     // We should always wake waiting producers, and have them throw exceptions as
1520                     // Add&CompleteAdding should not be used concurrently.
1521                     CancelWaitingProducers();
1522                     return;
1523
1524                 }
1525                 spinner.SpinOnce();
1526             }
1527         }
1528
1529         /// <summary>Cancels the semaphores.</summary>
1530         private void CancelWaitingConsumers()
1531         {
1532             m_ConsumersCancellationTokenSource.Cancel();
1533         }
1534
1535         private void CancelWaitingProducers()
1536         {
1537             m_ProducersCancellationTokenSource.Cancel();
1538         }
1539
1540
1541         /// <summary>
1542         /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
1543         /// </summary>
1544         public void Dispose()
1545         {
1546             Dispose(true);
1547             GC.SuppressFinalize(this);
1548         }
1549
1550         /// <summary>
1551         /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
1552         /// </summary>
1553         /// <param name="disposing">Whether being disposed explicitly (true) or due to a finalizer (false).</param>
1554         protected virtual void Dispose(bool disposing)
1555         {
1556             if (!m_isDisposed)
1557             {
1558                 if (m_freeNodes != null)
1559                 {
1560                     m_freeNodes.Dispose();
1561                 }
1562
1563                 m_occupiedNodes.Dispose();
1564
1565                 m_isDisposed = true;
1566             }
1567         }
1568
1569         /// <summary>Copies the items from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance into a new array.</summary>
1570         /// <returns>An array containing copies of the elements of the collection.</returns>
1571         /// <exception cref="T:System.ObjectDisposedException">The <see
1572         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1573         /// <remarks>
1574         /// The copied elements are not removed from the collection.
1575         /// </remarks>
1576         public T[] ToArray()
1577         {
1578             CheckDisposed();
1579             return m_collection.ToArray();
1580         }
1581
1582         /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance 
1583         /// to a compatible one-dimensional array, starting at the specified index of the target array.
1584         /// </summary>
1585         /// <param name="array">The one-dimensional array that is the destination of the elements copied from 
1586         /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
1587         /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
1588         /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
1589         /// null.</exception>
1590         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
1591         /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater 
1592         /// than the length of the <paramref name="array"/>.</exception>
1593         /// <exception cref="T:System.ObjectDisposedException">The <see
1594         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1595         public void CopyTo(T[] array, int index)
1596         {
1597             ((ICollection)this).CopyTo(array, index);
1598         }
1599
1600         /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance 
1601         /// to a compatible one-dimensional array, starting at the specified index of the target array.
1602         /// </summary>
1603         /// <param name="array">The one-dimensional array that is the destination of the elements copied from 
1604         /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
1605         /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
1606         /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
1607         /// null.</exception>
1608         /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
1609         /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater 
1610         /// than the length of the <paramref name="array"/>, the array is multidimensional, or the type parameter for the collection 
1611         /// cannot be cast automatically to the type of the destination array.</exception>
1612         /// <exception cref="T:System.ObjectDisposedException">The <see
1613         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1614         void ICollection.CopyTo(Array array, int index)
1615         {
1616             CheckDisposed();
1617
1618             //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize 
1619             //all array exceptions.  
1620             T[] collectionSnapShot = m_collection.ToArray();
1621
1622             try
1623             {
1624                 Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
1625             }
1626             catch (ArgumentNullException)
1627             {
1628                 throw new ArgumentNullException("array");
1629             }
1630             catch (ArgumentOutOfRangeException)
1631             {
1632                 throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative));
1633             }
1634             catch (ArgumentException)
1635             {
1636                 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index");
1637             }
1638             catch (RankException)
1639             {
1640                 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array");
1641             }
1642             catch (InvalidCastException)
1643             {
1644                 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
1645             }
1646             catch (ArrayTypeMismatchException)
1647             {
1648                 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
1649             }
1650         }
1651
1652         /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.</summary>
1653         /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
1654         /// <exception cref="T:System.ObjectDisposedException">The <see
1655         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1656         public IEnumerable<T> GetConsumingEnumerable()
1657         {
1658             return GetConsumingEnumerable(CancellationToken.None);
1659         }
1660
1661         /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.
1662         /// Calling MoveNext on the returned enumerable will block if there is no data available, or will
1663         /// throw an <see cref="System.OperationCanceledException"/> if the <see cref="CancellationToken"/> is canceled.
1664         /// </summary>
1665         /// <param name="cancellationToken">A cancellation token to observe.</param>
1666         /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
1667         /// <exception cref="T:System.ObjectDisposedException">The <see
1668         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1669         /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
1670         public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
1671         {
1672             CancellationTokenSource linkedTokenSource = null;
1673             try
1674             {
1675                 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
1676                 while (!IsCompleted)
1677                 {
1678                     T item;
1679                     if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource))
1680                     {
1681                         yield return item;
1682                     }
1683                 }
1684             }
1685             finally
1686             {
1687                 if (linkedTokenSource != null)
1688                 {
1689                     linkedTokenSource.Dispose();
1690                 }
1691             }
1692         }
1693
1694         /// <summary>Provides an <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for items in the collection.</summary>
1695         /// <returns>An <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for the items in the collection.</returns>
1696         /// <exception cref="T:System.ObjectDisposedException">The <see
1697         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1698         IEnumerator<T> IEnumerable<T>.GetEnumerator()
1699         {
1700             CheckDisposed();
1701             return m_collection.GetEnumerator();
1702
1703         }
1704
1705         /// <summary>Provides an <see cref="T:System.Collections.IEnumerator"/> for items in the collection.</summary>
1706         /// <returns>An <see cref="T:System.Collections.IEnumerator"/> for the items in the collection.</returns>
1707         /// <exception cref="T:System.ObjectDisposedException">The <see
1708         /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
1709         IEnumerator IEnumerable.GetEnumerator()
1710         {
1711             return ((IEnumerable<T>)this).GetEnumerator();
1712         }
1713
1714         /// <summary>Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
1715         /// and TryTakeFromAny().</summary>
1716         /// <param name="collections">The collections to/from which an item should be added/removed.</param>
1717         /// <param name="operationMode">Indicates whether this method is called to Add or Take.</param>
1718         /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
1719         /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 
1720         /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
1721         /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
1722         private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)
1723         {
1724             if (collections == null)
1725             {
1726                 throw new ArgumentNullException("collections");
1727             }
1728             else if (collections.Length < 1)
1729             {
1730                 throw new ArgumentException(
1731                     SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections");
1732             }
1733             else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62))
1734             //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken                
1735             {
1736                 throw new ArgumentOutOfRangeException(
1737                     "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize));
1738             }
1739
1740             for (int i = 0; i < collections.Length; ++i)
1741             {
1742                 if (collections[i] == null)
1743                 {
1744                     throw new ArgumentException(
1745                         SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections");
1746                 }
1747
1748                 if (collections[i].m_isDisposed)
1749                     throw new ObjectDisposedException(
1750                         "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems));
1751
1752                 if (isAddOperation && collections[i].IsAddingCompleted)
1753                 {
1754                     throw new ArgumentException(
1755                         SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
1756                 }
1757             }
1758         }
1759
1760         private static bool IsSTAThread
1761         {
1762             get
1763             {
1764 #if !SILVERLIGHT
1765                 return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA;
1766 #else
1767                 return false;
1768 #endif
1769             }
1770         }
1771
1772         // ---------
1773         // Private Helpers.
1774         /// <summary>Centeralizes the logic of validating the timeout input argument.</summary>
1775         /// <param name="timeout">The TimeSpan to wait for to successfully complete an operation on the collection.</param>
1776         /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds represented by the timeout 
1777         /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite</exception>
1778         private static void ValidateTimeout(TimeSpan timeout)
1779         {
1780             long totalMilliseconds = (long)timeout.TotalMilliseconds;
1781             if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite))
1782             {
1783                 throw new ArgumentOutOfRangeException("timeout", timeout,
1784                     String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
1785             }
1786         }
1787
1788         /// <summary>Centralizes the logic of validating the millisecondsTimeout input argument.</summary>
1789         /// <param name="millisecondsTimeout">The number of milliseconds to wait for to successfully complete an 
1790         /// operation on the collection.</param>
1791         /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds is less than 0 and not 
1792         /// equal to Timeout.Infinite.</exception>
1793         private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
1794         {
1795             if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
1796             {
1797                 throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
1798                     String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
1799             }
1800         }
1801
1802         /// <summary>Throws a System.ObjectDisposedException if the collection was disposed</summary>
1803         /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
1804         private void CheckDisposed()
1805         {
1806             if (m_isDisposed)
1807             {
1808                 throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed));
1809             }
1810         }
1811
1812     }
1813
1814
1815
1816     /// <summary>A debugger view of the blocking collection that makes it simple to browse the
1817     /// collection's contents at a point in time.</summary>
1818     /// <typeparam name="T">The type of element that the BlockingCollection will hold.</typeparam>
1819     internal sealed class SystemThreadingCollections_BlockingCollectionDebugView<T>
1820     {
1821         private BlockingCollection<T> m_blockingCollection; // The collection being viewed.
1822
1823         /// <summary>Constructs a new debugger view object for the provided blocking collection object.</summary>
1824         /// <param name="collection">A blocking collection to browse in the debugger.</param>
1825         public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection<T> collection)
1826         {
1827             if (collection == null)
1828             {
1829                 throw new ArgumentNullException("collection");
1830             }
1831
1832             m_blockingCollection = collection;
1833         }
1834
1835         /// <summary>Returns a snapshot of the underlying collection's elements.</summary>
1836         [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
1837         public T[] Items
1838         {
1839             get
1840             {
1841                 return m_blockingCollection.ToArray();
1842             }
1843         }
1844
1845     }
1846 }
1847 #pragma warning restore 0420