Merge pull request #836 from madewokherd/buildfixes
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 #if NET_4_0
27
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34
35 namespace System.Collections.Concurrent
36 {
37         [ComVisible (false)]
38         [DebuggerDisplay ("Count={Count}")]
39         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40         public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41         {
42                 const int spinCount = 5;
43
44                 readonly IProducerConsumerCollection<T> underlyingColl;
45
46                 /* These events are used solely for the purpose of having an optimized sleep cycle when
47                  * the BlockingCollection have to wait on an external event (Add or Remove for instance)
48                  */
49                 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
50                 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
51                 AtomicBoolean isComplete;
52
53                 readonly int upperBound;
54
55                 int completeId;
56
57                 /* The whole idea of the collection is to use these two long values in a transactional
58                  * way to track and manage the actual data inside the underlying lock-free collection
59                  * instead of directly working with it or using external locking.
60                  *
61                  * They are manipulated with CAS and are guaranteed to increase over time and use
62                  * of the instance thus preventing ABA problems.
63                  */
64                 int addId = int.MinValue;
65                 int removeId = int.MinValue;
66
67
68                 /* For time based operations, we share this instance of Stopwatch and base calculation
69                    on a time offset at each of these method call */
70                 static Stopwatch watch = Stopwatch.StartNew ();
71
72                 #region ctors
73                 public BlockingCollection ()
74                         : this (new ConcurrentQueue<T> (), -1)
75                 {
76                 }
77
78                 public BlockingCollection (int boundedCapacity)
79                         : this (new ConcurrentQueue<T> (), boundedCapacity)
80                 {
81                 }
82
83                 public BlockingCollection (IProducerConsumerCollection<T> collection)
84                         : this (collection, -1)
85                 {
86                 }
87
88                 public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
89                 {
90                         this.underlyingColl = collection;
91                         this.upperBound     = boundedCapacity;
92                         this.isComplete     = new AtomicBoolean ();
93                 }
94                 #endregion
95
96                 #region Add & Remove (+ Try)
97                 public void Add (T item)
98                 {
99                         Add (item, CancellationToken.None);
100                 }
101
102                 public void Add (T item, CancellationToken cancellationToken)
103                 {
104                         TryAdd (item, -1, cancellationToken);
105                 }
106
107                 public bool TryAdd (T item)
108                 {
109                         return TryAdd (item, 0, CancellationToken.None);
110                 }
111
112                 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
113                 {
114                         if (millisecondsTimeout < -1)
115                                 throw new ArgumentOutOfRangeException ("millisecondsTimeout");
116
117                         long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
118                         SpinWait sw = new SpinWait ();
119
120                         do {
121                                 cancellationToken.ThrowIfCancellationRequested ();
122
123                                 int cachedAddId = addId;
124                                 int cachedRemoveId = removeId;
125                                 int itemsIn = cachedAddId - cachedRemoveId;
126
127                                 // If needed, we check and wait that the collection isn't full
128                                 if (upperBound != -1 && itemsIn > upperBound) {
129                                         if (millisecondsTimeout == 0)
130                                                 return false;
131
132                                         if (sw.Count <= spinCount) {
133                                                 sw.SpinOnce ();
134                                         } else {
135                                                 mreRemove.Reset ();
136                                                 if (cachedRemoveId != removeId || cachedAddId != addId) {
137                                                         mreRemove.Set ();
138                                                         continue;
139                                                 }
140
141                                                 mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
142                                         }
143
144                                         continue;
145                                 }
146
147                                 // Check our transaction id against completed stored one
148                                 if (isComplete.Value && cachedAddId >= completeId)
149                                         ThrowCompleteException ();
150
151                                 // Validate the steps we have been doing until now
152                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
153                                         continue;
154
155                                 // We have a slot reserved in the underlying collection, try to take it
156                                 if (!underlyingColl.TryAdd (item))
157                                         throw new InvalidOperationException ("The underlying collection didn't accept the item.");
158
159                                 // Wake up process that may have been sleeping
160                                 mreAdd.Set ();
161
162                                 return true;
163                         } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
164
165                         return false;
166                 }
167
168                 public bool TryAdd (T item, TimeSpan timeout)
169                 {
170                         return TryAdd (item, (int)timeout.TotalMilliseconds);
171                 }
172
173                 public bool TryAdd (T item, int millisecondsTimeout)
174                 {
175                         return TryAdd (item, millisecondsTimeout, CancellationToken.None);
176                 }
177
178                 public T Take ()
179                 {
180                         return Take (CancellationToken.None);
181                 }
182
183                 public T Take (CancellationToken cancellationToken)
184                 {
185                         T item;
186                         TryTake (out item, -1, cancellationToken, true);
187
188                         return item;
189                 }
190
191                 public bool TryTake (out T item)
192                 {
193                         return TryTake (out item, 0, CancellationToken.None);
194                 }
195
196                 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
197                 {
198                         return TryTake (out item, millisecondsTimeout, cancellationToken, false);
199                 }
200
201                 bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
202                 {
203                         if (milliseconds < -1)
204                                 throw new ArgumentOutOfRangeException ("milliseconds");
205
206                         item = default (T);
207                         SpinWait sw = new SpinWait ();
208                         long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
209
210                         do {
211                                 cancellationToken.ThrowIfCancellationRequested ();
212
213                                 int cachedRemoveId = removeId;
214                                 int cachedAddId = addId;
215
216                                 // Empty case
217                                 if (cachedRemoveId == cachedAddId) {
218                                         if (milliseconds == 0)
219                                                 return false;
220
221                                         if (IsCompleted) {
222                                                 if (throwComplete)
223                                                         ThrowCompleteException ();
224                                                 else
225                                                         return false;
226                                         }
227
228                                         if (sw.Count <= spinCount) {
229                                                 sw.SpinOnce ();
230                                         } else {
231                                                 mreAdd.Reset ();
232                                                 if (cachedRemoveId != removeId || cachedAddId != addId) {
233                                                         mreAdd.Set ();
234                                                         continue;
235                                                 }
236
237                                                 mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
238                                         }
239
240                                         continue;
241                                 }
242
243                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
244                                         continue;
245
246                                 while (!underlyingColl.TryTake (out item));
247
248                                 mreRemove.Set ();
249
250                                 return true;
251
252                         } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
253
254                         return false;
255                 }
256
257                 public bool TryTake (out T item, TimeSpan timeout)
258                 {
259                         return TryTake (out item, (int)timeout.TotalMilliseconds);
260                 }
261
262                 public bool TryTake (out T item, int millisecondsTimeout)
263                 {
264                         item = default (T);
265
266                         return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
267                 }
268
269                 static int ComputeTimeout (int millisecondsTimeout, long start)
270                 {
271                         return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
272                 }
273                 #endregion
274
275                 #region static methods
276                 static void CheckArray (BlockingCollection<T>[] collections)
277                 {
278                         if (collections == null)
279                                 throw new ArgumentNullException ("collections");
280                         if (collections.Length == 0 || IsThereANullElement (collections))
281                                 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
282                 }
283
284                 static bool IsThereANullElement (BlockingCollection<T>[] collections)
285                 {
286                         foreach (BlockingCollection<T> e in collections)
287                                 if (e == null)
288                                         return true;
289                         return false;
290                 }
291
292                 public static int AddToAny (BlockingCollection<T>[] collections, T item)
293                 {
294                         CheckArray (collections);
295                         int index = 0;
296                         foreach (var coll in collections) {
297                                 try {
298                                         coll.Add (item);
299                                         return index;
300                                 } catch {}
301                                 index++;
302                         }
303                         return -1;
304                 }
305
306                 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
307                 {
308                         CheckArray (collections);
309                         int index = 0;
310                         foreach (var coll in collections) {
311                                 try {
312                                         coll.Add (item, cancellationToken);
313                                         return index;
314                                 } catch {}
315                                 index++;
316                         }
317                         return -1;
318                 }
319
320                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
321                 {
322                         CheckArray (collections);
323                         int index = 0;
324                         foreach (var coll in collections) {
325                                 if (coll.TryAdd (item))
326                                         return index;
327                                 index++;
328                         }
329                         return -1;
330                 }
331
332                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
333                 {
334                         CheckArray (collections);
335                         int index = 0;
336                         foreach (var coll in collections) {
337                                 if (coll.TryAdd (item, timeout))
338                                         return index;
339                                 index++;
340                         }
341                         return -1;
342                 }
343
344                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
345                 {
346                         CheckArray (collections);
347                         int index = 0;
348                         foreach (var coll in collections) {
349                                 if (coll.TryAdd (item, millisecondsTimeout))
350                                         return index;
351                                 index++;
352                         }
353                         return -1;
354                 }
355
356                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
357                                                CancellationToken cancellationToken)
358                 {
359                         CheckArray (collections);
360                         int index = 0;
361                         foreach (var coll in collections) {
362                                 if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
363                                         return index;
364                                 index++;
365                         }
366                         return -1;
367                 }
368
369                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
370                 {
371                         item = default (T);
372                         CheckArray (collections);
373                         WaitHandle[] wait_table = null;
374                         while (true) {
375                                 int index = 0;
376                                 for (int i = 0; i < collections.Length; ++i) {
377                                         if (collections [i].TryTake (out item))
378                                                 return i;
379                                 }
380                                 if (wait_table == null) {
381                                         wait_table = new WaitHandle [collections.Length];
382                                         for (int i = 0; i < collections.Length; ++i)
383                                                 wait_table [i] = collections [i].mreRemove.WaitHandle;
384                                 }
385                                 WaitHandle.WaitAny (wait_table);
386                         }
387                         return -1;
388                 }
389
390                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
391                 {
392                         item = default (T);
393                         CheckArray (collections);
394                         WaitHandle[] wait_table = null;
395                         while (true) {
396                                 int index = 0;
397                                 for (int i = 0; i < collections.Length; ++i) {
398                                         if (collections [i].TryTake (out item))
399                                                 return i;
400                                 }
401                                 cancellationToken.ThrowIfCancellationRequested ();
402                                 if (wait_table == null) {
403                                         wait_table = new WaitHandle [collections.Length + 1];
404                                         for (int i = 0; i < collections.Length; ++i)
405                                                 wait_table [i] = collections [i].mreRemove.WaitHandle;
406                                         wait_table [collections.Length] = cancellationToken.WaitHandle;
407                                 }
408                                 WaitHandle.WaitAny (wait_table);
409                                 cancellationToken.ThrowIfCancellationRequested ();
410                         }
411
412                         return -1;
413                 }
414
415                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
416                 {
417                         item = default (T);
418
419                         CheckArray (collections);
420                         int index = 0;
421                         foreach (var coll in collections) {
422                                 if (coll.TryTake (out item))
423                                         return index;
424                                 index++;
425                         }
426                         return -1;
427                 }
428
429                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
430                 {
431                         item = default (T);
432
433                         CheckArray (collections);
434                         int index = 0;
435                         foreach (var coll in collections) {
436                                 if (coll.TryTake (out item, timeout))
437                                         return index;
438                                 index++;
439                         }
440                         return -1;
441                 }
442
443                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
444                 {
445                         item = default (T);
446
447                         CheckArray (collections);
448                         int index = 0;
449                         foreach (var coll in collections) {
450                                 if (coll.TryTake (out item, millisecondsTimeout))
451                                         return index;
452                                 index++;
453                         }
454                         return -1;
455                 }
456
457                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
458                                                   CancellationToken cancellationToken)
459                 {
460                         item = default (T);
461
462                         CheckArray (collections);
463                         int index = 0;
464                         foreach (var coll in collections) {
465                                 if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
466                                         return index;
467                                 index++;
468                         }
469                         return -1;
470                 }
471                 #endregion
472
473                 public void CompleteAdding ()
474                 {
475                         // No further add beside that point
476                         completeId = addId;
477                         isComplete.Value = true;
478                         // Wakeup some operation in case this has an impact
479                         mreAdd.Set ();
480                         mreRemove.Set ();
481                 }
482
483                 void ThrowCompleteException ()
484                 {
485                         throw new InvalidOperationException ("The BlockingCollection<T> has"
486                                                              + " been marked as complete with regards to additions.");
487                 }
488
489                 void ICollection.CopyTo (Array array, int index)
490                 {
491                         underlyingColl.CopyTo (array, index);
492                 }
493
494                 public void CopyTo (T[] array, int index)
495                 {
496                         underlyingColl.CopyTo (array, index);
497                 }
498
499                 public IEnumerable<T> GetConsumingEnumerable ()
500                 {
501                         return GetConsumingEnumerable (CancellationToken.None);
502                 }
503
504                 public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
505                 {
506                         while (true) {
507                                 T item = default (T);
508
509                                 try {
510                                         item = Take (cancellationToken);
511                                 } catch {
512                                         // Then the exception is perfectly normal
513                                         if (IsCompleted)
514                                                 break;
515                                         // otherwise rethrow
516                                         throw;
517                                 }
518
519                                 yield return item;
520                         }
521                 }
522
523                 IEnumerator IEnumerable.GetEnumerator ()
524                 {
525                         return ((IEnumerable)underlyingColl).GetEnumerator ();
526                 }
527
528                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
529                 {
530                         return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
531                 }
532
533                 public void Dispose ()
534                 {
535
536                 }
537
538                 protected virtual void Dispose (bool disposing)
539                 {
540
541                 }
542
543                 public T[] ToArray ()
544                 {
545                         return underlyingColl.ToArray ();
546                 }
547
548                 public int BoundedCapacity {
549                         get {
550                                 return upperBound;
551                         }
552                 }
553
554                 public int Count {
555                         get {
556                                 return underlyingColl.Count;
557                         }
558                 }
559
560                 public bool IsAddingCompleted {
561                         get {
562                                 return isComplete.Value;
563                         }
564                 }
565
566                 public bool IsCompleted {
567                         get {
568                                 return isComplete.Value && addId == removeId;
569                         }
570                 }
571
572                 object ICollection.SyncRoot {
573                         get {
574                                 return underlyingColl.SyncRoot;
575                         }
576                 }
577
578                 bool ICollection.IsSynchronized {
579                         get {
580                                 return underlyingColl.IsSynchronized;
581                         }
582                 }
583         }
584 }
585 #endif