do not check order sequence if option /order was not used
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 #if NET_4_0
27
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34
35 namespace System.Collections.Concurrent
36 {
37         [ComVisible (false)]
38         [DebuggerDisplay ("Count={Count}")]
39         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40         public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41         {
42                 const int spinCount = 5;
43
44                 readonly IProducerConsumerCollection<T> underlyingColl;
45                 readonly int upperBound;
46
47                 AtomicBoolean isComplete;
48                 long completeId;
49
50                 /* The whole idea of the collection is to use these two long values in a transactional
51                  * way to track and manage the actual data inside the underlying lock-free collection
52                  * instead of directly working with it or using external locking.
53                  *
54                  * They are manipulated with CAS and are guaranteed to increase over time and use
55                  * of the instance thus preventing ABA problems.
56                  */
57                 long addId = long.MinValue;
58                 long removeId = long.MinValue;
59
60                 /* These events are used solely for the purpose of having an optimized sleep cycle when
61                  * the BlockingCollection have to wait on an external event (Add or Remove for instance)
62                  */
63                 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64                 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
65
66                 /* For time based operations, we share this instance of Stopwatch and base calculation
67                    on a time offset at each of these method call */
68                 static Stopwatch watch = Stopwatch.StartNew ();
69
70                 #region ctors
71                 public BlockingCollection ()
72                         : this (new ConcurrentQueue<T> (), -1)
73                 {
74                 }
75
76                 public BlockingCollection (int boundedCapacity)
77                         : this (new ConcurrentQueue<T> (), boundedCapacity)
78                 {
79                 }
80
81                 public BlockingCollection (IProducerConsumerCollection<T> collection)
82                         : this (collection, -1)
83                 {
84                 }
85
86                 public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
87                 {
88                         this.underlyingColl = collection;
89                         this.upperBound     = boundedCapacity;
90                         this.isComplete     = new AtomicBoolean ();
91                 }
92                 #endregion
93
94                 #region Add & Remove (+ Try)
95                 public void Add (T item)
96                 {
97                         Add (item, CancellationToken.None);
98                 }
99
100                 public void Add (T item, CancellationToken cancellationToken)
101                 {
102                         TryAdd (item, -1, cancellationToken);
103                 }
104
105                 public bool TryAdd (T item)
106                 {
107                         return TryAdd (item, 0, CancellationToken.None);
108                 }
109
110                 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
111                 {
112                         if (millisecondsTimeout < -1)
113                                 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
114
115                         long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
116                         SpinWait sw = new SpinWait ();
117
118                         do {
119                                 cancellationToken.ThrowIfCancellationRequested ();
120
121                                 long cachedAddId = addId;
122                                 long cachedRemoveId = removeId;
123
124                                 // If needed, we check and wait that the collection isn't full
125                                 if (upperBound != -1 && cachedAddId - cachedRemoveId > upperBound) {
126                                         if (millisecondsTimeout == 0)
127                                                 return false;
128
129                                         if (sw.Count <= spinCount) {
130                                                 sw.SpinOnce ();
131                                         } else {
132                                                 mreRemove.Reset ();
133                                                 if (cachedRemoveId != removeId || cachedAddId != addId) {
134                                                         mreRemove.Set ();
135                                                         continue;
136                                                 }
137
138                                                 mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
139                                         }
140
141                                         continue;
142                                 }
143
144                                 // Check our transaction id against completed stored one
145                                 if (isComplete.Value && cachedAddId >= completeId)
146                                         ThrowCompleteException ();
147
148                                 // Validate the steps we have been doing until now
149                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
150                                         continue;
151
152                                 // We have a slot reserved in the underlying collection, try to take it
153                                 if (!underlyingColl.TryAdd (item))
154                                         throw new InvalidOperationException ("The underlying collection didn't accept the item.");
155
156                                 // Wake up process that may have been sleeping
157                                 mreAdd.Set ();
158
159                                 return true;
160                         } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
161
162                         return false;
163                 }
164
165                 public bool TryAdd (T item, TimeSpan timeout)
166                 {
167                         return TryAdd (item, (int)timeout.TotalMilliseconds);
168                 }
169
170                 public bool TryAdd (T item, int millisecondsTimeout)
171                 {
172                         return TryAdd (item, millisecondsTimeout, CancellationToken.None);
173                 }
174
175                 public T Take ()
176                 {
177                         return Take (CancellationToken.None);
178                 }
179
180                 public T Take (CancellationToken cancellationToken)
181                 {
182                         T item;
183                         TryTake (out item, -1, cancellationToken, true);
184
185                         return item;
186                 }
187
188                 public bool TryTake (out T item)
189                 {
190                         return TryTake (out item, 0, CancellationToken.None);
191                 }
192
193                 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
194                 {
195                         return TryTake (out item, millisecondsTimeout, cancellationToken, false);
196                 }
197
198                 bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
199                 {
200                         if (milliseconds < -1)
201                                 throw new ArgumentOutOfRangeException ("milliseconds");
202
203                         item = default (T);
204                         SpinWait sw = new SpinWait ();
205                         long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
206
207                         do {
208                                 cancellationToken.ThrowIfCancellationRequested ();
209
210                                 long cachedRemoveId = removeId;
211                                 long cachedAddId = addId;
212
213                                 // Empty case
214                                 if (cachedRemoveId == cachedAddId) {
215                                         if (milliseconds == 0)
216                                                 return false;
217
218                                         if (IsCompleted) {
219                                                 if (throwComplete)
220                                                         ThrowCompleteException ();
221                                                 else
222                                                         return false;
223                                         }
224
225                                         if (sw.Count <= spinCount) {
226                                                 sw.SpinOnce ();
227                                         } else {
228                                                 mreAdd.Reset ();
229                                                 if (cachedRemoveId != removeId || cachedAddId != addId) {
230                                                         mreAdd.Set ();
231                                                         continue;
232                                                 }
233
234                                                 mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
235                                         }
236
237                                         continue;
238                                 }
239
240                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
241                                         continue;
242
243                                 while (!underlyingColl.TryTake (out item));
244
245                                 mreRemove.Set ();
246
247                                 return true;
248
249                         } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
250
251                         return false;
252                 }
253
254                 public bool TryTake (out T item, TimeSpan timeout)
255                 {
256                         return TryTake (out item, (int)timeout.TotalMilliseconds);
257                 }
258
259                 public bool TryTake (out T item, int millisecondsTimeout)
260                 {
261                         item = default (T);
262
263                         return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
264                 }
265
266                 static int ComputeTimeout (int millisecondsTimeout, long start)
267                 {
268                         return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
269                 }
270                 #endregion
271
272                 #region static methods
273                 static void CheckArray (BlockingCollection<T>[] collections)
274                 {
275                         if (collections == null)
276                                 throw new ArgumentNullException ("collections");
277                         if (collections.Length == 0 || IsThereANullElement (collections))
278                                 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
279                 }
280
281                 static bool IsThereANullElement (BlockingCollection<T>[] collections)
282                 {
283                         foreach (BlockingCollection<T> e in collections)
284                                 if (e == null)
285                                         return true;
286                         return false;
287                 }
288
289                 public static int AddToAny (BlockingCollection<T>[] collections, T item)
290                 {
291                         CheckArray (collections);
292                         int index = 0;
293                         foreach (var coll in collections) {
294                                 try {
295                                         coll.Add (item);
296                                         return index;
297                                 } catch {}
298                                 index++;
299                         }
300                         return -1;
301                 }
302
303                 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
304                 {
305                         CheckArray (collections);
306                         int index = 0;
307                         foreach (var coll in collections) {
308                                 try {
309                                         coll.Add (item, cancellationToken);
310                                         return index;
311                                 } catch {}
312                                 index++;
313                         }
314                         return -1;
315                 }
316
317                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
318                 {
319                         CheckArray (collections);
320                         int index = 0;
321                         foreach (var coll in collections) {
322                                 if (coll.TryAdd (item))
323                                         return index;
324                                 index++;
325                         }
326                         return -1;
327                 }
328
329                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
330                 {
331                         CheckArray (collections);
332                         int index = 0;
333                         foreach (var coll in collections) {
334                                 if (coll.TryAdd (item, timeout))
335                                         return index;
336                                 index++;
337                         }
338                         return -1;
339                 }
340
341                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
342                 {
343                         CheckArray (collections);
344                         int index = 0;
345                         foreach (var coll in collections) {
346                                 if (coll.TryAdd (item, millisecondsTimeout))
347                                         return index;
348                                 index++;
349                         }
350                         return -1;
351                 }
352
353                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
354                                                CancellationToken cancellationToken)
355                 {
356                         CheckArray (collections);
357                         int index = 0;
358                         foreach (var coll in collections) {
359                                 if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
360                                         return index;
361                                 index++;
362                         }
363                         return -1;
364                 }
365
366                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
367                 {
368                         item = default (T);
369                         CheckArray (collections);
370                         int index = 0;
371                         foreach (var coll in collections) {
372                                 try {
373                                         item = coll.Take ();
374                                         return index;
375                                 } catch {}
376                                 index++;
377                         }
378                         return -1;
379                 }
380
381                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
382                 {
383                         item = default (T);
384                         CheckArray (collections);
385                         int index = 0;
386                         foreach (var coll in collections) {
387                                 try {
388                                         item = coll.Take (cancellationToken);
389                                         return index;
390                                 } catch {}
391                                 index++;
392                         }
393                         return -1;
394                 }
395
396                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
397                 {
398                         item = default (T);
399
400                         CheckArray (collections);
401                         int index = 0;
402                         foreach (var coll in collections) {
403                                 if (coll.TryTake (out item))
404                                         return index;
405                                 index++;
406                         }
407                         return -1;
408                 }
409
410                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
411                 {
412                         item = default (T);
413
414                         CheckArray (collections);
415                         int index = 0;
416                         foreach (var coll in collections) {
417                                 if (coll.TryTake (out item, timeout))
418                                         return index;
419                                 index++;
420                         }
421                         return -1;
422                 }
423
424                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
425                 {
426                         item = default (T);
427
428                         CheckArray (collections);
429                         int index = 0;
430                         foreach (var coll in collections) {
431                                 if (coll.TryTake (out item, millisecondsTimeout))
432                                         return index;
433                                 index++;
434                         }
435                         return -1;
436                 }
437
438                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
439                                                   CancellationToken cancellationToken)
440                 {
441                         item = default (T);
442
443                         CheckArray (collections);
444                         int index = 0;
445                         foreach (var coll in collections) {
446                                 if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
447                                         return index;
448                                 index++;
449                         }
450                         return -1;
451                 }
452                 #endregion
453
454                 public void CompleteAdding ()
455                 {
456                         // No further add beside that point
457                         completeId = addId;
458                         isComplete.Value = true;
459                         // Wakeup some operation in case this has an impact
460                         mreAdd.Set ();
461                         mreRemove.Set ();
462                 }
463
464                 void ThrowCompleteException ()
465                 {
466                         throw new InvalidOperationException ("The BlockingCollection<T> has"
467                                                              + " been marked as complete with regards to additions.");
468                 }
469
470                 void ICollection.CopyTo (Array array, int index)
471                 {
472                         underlyingColl.CopyTo (array, index);
473                 }
474
475                 public void CopyTo (T[] array, int index)
476                 {
477                         underlyingColl.CopyTo (array, index);
478                 }
479
480                 public IEnumerable<T> GetConsumingEnumerable ()
481                 {
482                         return GetConsumingEnumerable (CancellationToken.None);
483                 }
484
485                 public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
486                 {
487                         while (true) {
488                                 T item = default (T);
489
490                                 try {
491                                         item = Take (cancellationToken);
492                                 } catch {
493                                         // Then the exception is perfectly normal
494                                         if (IsCompleted)
495                                                 break;
496                                         // otherwise rethrow
497                                         throw;
498                                 }
499
500                                 yield return item;
501                         }
502                 }
503
504                 IEnumerator IEnumerable.GetEnumerator ()
505                 {
506                         return ((IEnumerable)underlyingColl).GetEnumerator ();
507                 }
508
509                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
510                 {
511                         return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
512                 }
513
514                 public void Dispose ()
515                 {
516
517                 }
518
519                 protected virtual void Dispose (bool disposing)
520                 {
521
522                 }
523
524                 public T[] ToArray ()
525                 {
526                         return underlyingColl.ToArray ();
527                 }
528
529                 public int BoundedCapacity {
530                         get {
531                                 return upperBound;
532                         }
533                 }
534
535                 public int Count {
536                         get {
537                                 return underlyingColl.Count;
538                         }
539                 }
540
541                 public bool IsAddingCompleted {
542                         get {
543                                 return isComplete.Value;
544                         }
545                 }
546
547                 public bool IsCompleted {
548                         get {
549                                 return isComplete.Value && addId == removeId;
550                         }
551                 }
552
553                 object ICollection.SyncRoot {
554                         get {
555                                 return underlyingColl.SyncRoot;
556                         }
557                 }
558
559                 bool ICollection.IsSynchronized {
560                         get {
561                                 return underlyingColl.IsSynchronized;
562                         }
563                 }
564         }
565 }
566 #endif