Merge branch 'master' of github.com:mono/mono
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 #if NET_4_0 || BOOTSTRAP_NET_4_0
27
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34
35 namespace System.Collections.Concurrent
36 {
37         [ComVisible (false)]
38         [DebuggerDisplay ("Count={Count}")]
39         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40         public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41         {
42                 const int spinCount = 5;
43
44                 readonly IProducerConsumerCollection<T> underlyingColl;
45                 readonly int upperBound;
46
47                 AtomicBoolean isComplete;
48                 long completeId;
49
50                 /* The whole idea of the collection is to use these two long values in a transactional
51                  * to track and manage the actual data inside the underlying lock-free collection
52                  * instead of directly working with it or using external locking.
53                  *
54                  * They are manipulated with CAS and are guaranteed to increase over time and use
55                  * of the instance thus preventing ABA problems.
56                  */
57                 long addId = long.MinValue;
58                 long removeId = long.MinValue;
59
60                 /* These events are used solely for the purpose of having an optimized sleep cycle when
61                  * the BlockingCollection have to wait on an external event (Add or Remove for instance)
62                  */
63                 ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
64                 ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
65
66                 #region ctors
67                 public BlockingCollection ()
68                         : this (new ConcurrentQueue<T> (), -1)
69                 {
70                 }
71
72                 public BlockingCollection (int upperBound)
73                         : this (new ConcurrentQueue<T> (), upperBound)
74                 {
75                 }
76
77                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
78                         : this (underlyingColl, -1)
79                 {
80                 }
81
82                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
83                 {
84                         this.underlyingColl = underlyingColl;
85                         this.upperBound     = upperBound;
86                         this.isComplete     = new AtomicBoolean ();
87                 }
88                 #endregion
89
90                 #region Add & Remove (+ Try)
91                 public void Add (T item)
92                 {
93                         Add (item, CancellationToken.None);
94                 }
95
96                 public void Add (T item, CancellationToken token)
97                 {
98                         SpinWait sw = new SpinWait ();
99                         long cachedAddId;
100
101                         while (true) {
102                                 token.ThrowIfCancellationRequested ();
103
104                                 cachedAddId = addId;
105                                 long cachedRemoveId = removeId;
106
107                                 if (upperBound != -1) {
108                                         if (cachedAddId - cachedRemoveId > upperBound) {
109                                                 if (sw.Count <= spinCount) {
110                                                         sw.SpinOnce ();
111                                                 } else {
112                                                         if (mreRemove.IsSet)
113                                                                 continue;
114                                                         if (cachedRemoveId != removeId)
115                                                                 continue;
116
117                                                         mreRemove.Wait (token);
118                                                         mreRemove.Reset ();
119                                                 }
120
121                                                 continue;
122                                         }
123                                 }
124
125                                 // Check our transaction id against completed stored one
126                                 if (isComplete.Value && cachedAddId >= completeId)
127                                         ThrowCompleteException ();
128                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
129                                         break;
130                         }
131
132                         if (isComplete.Value && cachedAddId >= completeId)
133                                 ThrowCompleteException ();
134
135                         while (!underlyingColl.TryAdd (item));
136
137                         if (!mreAdd.IsSet)
138                                 mreAdd.Set ();
139                 }
140
141                 public T Take ()
142                 {
143                         return Take (CancellationToken.None);
144                 }
145
146                 public T Take (CancellationToken token)
147                 {
148                         SpinWait sw = new SpinWait ();
149
150                         while (true) {
151                                 token.ThrowIfCancellationRequested ();
152
153                                 long cachedRemoveId = removeId;
154                                 long cachedAddId = addId;
155
156                                 // Empty case
157                                 if (cachedRemoveId == cachedAddId) {
158                                         if (IsCompleted)
159                                                 ThrowCompleteException ();
160
161                                         if (sw.Count <= spinCount) {
162                                                 sw.SpinOnce ();
163                                         } else {
164                                                 if (cachedAddId != addId)
165                                                         continue;
166                                                 if (IsCompleted)
167                                                         ThrowCompleteException ();
168
169                                                 mreAdd.Wait (token);
170                                                 mreAdd.Reset ();
171                                         }
172
173                                         continue;
174                                 }
175
176                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
177                                         break;
178                         }
179
180                         T item;
181                         while (!underlyingColl.TryTake (out item));
182
183                         if (!mreRemove.IsSet)
184                                 mreRemove.Set ();
185
186                         return item;
187                 }
188
189                 public bool TryAdd (T item)
190                 {
191                         return TryAdd (item, () => false, CancellationToken.None);
192                 }
193
194                 bool TryAdd (T item, Func<bool> contFunc, CancellationToken token)
195                 {
196                         do {
197                                 token.ThrowIfCancellationRequested ();
198
199                                 long cachedAddId = addId;
200                                 long cachedRemoveId = removeId;
201
202                                 if (upperBound != -1)
203                                         if (cachedAddId - cachedRemoveId > upperBound)
204                                                 continue;
205
206                                 // Check our transaction id against completed stored one
207                                 if (isComplete.Value && cachedAddId >= completeId)
208                                         throw new InvalidOperationException ("The BlockingCollection<T> has"
209                                                                              + " been marked as complete with regards to additions.");
210
211                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
212                                         continue;
213
214                                 while (!underlyingColl.TryAdd (item));
215
216                                 if (!mreAdd.IsSet)
217                                         mreAdd.Set ();
218
219                                 return true;
220                         } while (contFunc ());
221
222                         return false;
223                 }
224
225                 public bool TryAdd (T item, TimeSpan ts)
226                 {
227                         return TryAdd (item, (int)ts.TotalMilliseconds);
228                 }
229
230                 public bool TryAdd (T item, int millisecondsTimeout)
231                 {
232                         Stopwatch sw = Stopwatch.StartNew ();
233                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
234                 }
235
236                 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
237                 {
238                         Stopwatch sw = Stopwatch.StartNew ();
239                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
240                 }
241
242                 public bool TryTake (out T item)
243                 {
244                         return TryTake (out item, () => false, CancellationToken.None);
245                 }
246
247                 bool TryTake (out T item, Func<bool> contFunc, CancellationToken token)
248                 {
249                         item = default (T);
250
251                         do {
252                                 token.ThrowIfCancellationRequested ();
253
254                                 long cachedRemoveId = removeId;
255                                 long cachedAddId = addId;
256
257                                 // Empty case
258                                 if (cachedRemoveId == cachedAddId) {
259                                         if (IsCompleted)
260                                                 return false;
261
262                                         continue;
263                                 }
264
265                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
266                                         continue;
267
268                                 while (!underlyingColl.TryTake (out item));
269
270                                 if (!mreRemove.IsSet)
271                                         mreRemove.Set ();
272                         } while (contFunc ());
273
274                         return false;
275                 }
276
277                 public bool TryTake (out T item, TimeSpan ts)
278                 {
279                         return TryTake (out item, (int)ts.TotalMilliseconds);
280                 }
281
282                 public bool TryTake (out T item, int millisecondsTimeout)
283                 {
284                         item = default (T);
285                         Stopwatch sw = Stopwatch.StartNew ();
286
287                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
288                 }
289
290                 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
291                 {
292                         item = default (T);
293                         Stopwatch sw = Stopwatch.StartNew ();
294
295                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
296                 }
297                 #endregion
298
299                 #region static methods
300                 static void CheckArray (BlockingCollection<T>[] collections)
301                 {
302                         if (collections == null)
303                                 throw new ArgumentNullException ("collections");
304                         if (collections.Length == 0 || IsThereANullElement (collections))
305                                 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
306                 }
307
308                 static bool IsThereANullElement (BlockingCollection<T>[] collections)
309                 {
310                         foreach (BlockingCollection<T> e in collections)
311                                 if (e == null)
312                                         return true;
313                         return false;
314                 }
315
316                 public static int AddToAny (BlockingCollection<T>[] collections, T item)
317                 {
318                         CheckArray (collections);
319                         int index = 0;
320                         foreach (var coll in collections) {
321                                 try {
322                                         coll.Add (item);
323                                         return index;
324                                 } catch {}
325                                 index++;
326                         }
327                         return -1;
328                 }
329
330                 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
331                 {
332                         CheckArray (collections);
333                         int index = 0;
334                         foreach (var coll in collections) {
335                                 try {
336                                         coll.Add (item, token);
337                                         return index;
338                                 } catch {}
339                                 index++;
340                         }
341                         return -1;
342                 }
343
344                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
345                 {
346                         CheckArray (collections);
347                         int index = 0;
348                         foreach (var coll in collections) {
349                                 if (coll.TryAdd (item))
350                                         return index;
351                                 index++;
352                         }
353                         return -1;
354                 }
355
356                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
357                 {
358                         CheckArray (collections);
359                         int index = 0;
360                         foreach (var coll in collections) {
361                                 if (coll.TryAdd (item, ts))
362                                         return index;
363                                 index++;
364                         }
365                         return -1;
366                 }
367
368                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
369                 {
370                         CheckArray (collections);
371                         int index = 0;
372                         foreach (var coll in collections) {
373                                 if (coll.TryAdd (item, millisecondsTimeout))
374                                         return index;
375                                 index++;
376                         }
377                         return -1;
378                 }
379
380                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
381                                                CancellationToken token)
382                 {
383                         CheckArray (collections);
384                         int index = 0;
385                         foreach (var coll in collections) {
386                                 if (coll.TryAdd (item, millisecondsTimeout, token))
387                                         return index;
388                                 index++;
389                         }
390                         return -1;
391                 }
392
393                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
394                 {
395                         item = default (T);
396                         CheckArray (collections);
397                         int index = 0;
398                         foreach (var coll in collections) {
399                                 try {
400                                         item = coll.Take ();
401                                         return index;
402                                 } catch {}
403                                 index++;
404                         }
405                         return -1;
406                 }
407
408                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
409                 {
410                         item = default (T);
411                         CheckArray (collections);
412                         int index = 0;
413                         foreach (var coll in collections) {
414                                 try {
415                                         item = coll.Take (token);
416                                         return index;
417                                 } catch {}
418                                 index++;
419                         }
420                         return -1;
421                 }
422
423                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
424                 {
425                         item = default (T);
426
427                         CheckArray (collections);
428                         int index = 0;
429                         foreach (var coll in collections) {
430                                 if (coll.TryTake (out item))
431                                         return index;
432                                 index++;
433                         }
434                         return -1;
435                 }
436
437                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
438                 {
439                         item = default (T);
440
441                         CheckArray (collections);
442                         int index = 0;
443                         foreach (var coll in collections) {
444                                 if (coll.TryTake (out item, ts))
445                                         return index;
446                                 index++;
447                         }
448                         return -1;
449                 }
450
451                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
452                 {
453                         item = default (T);
454
455                         CheckArray (collections);
456                         int index = 0;
457                         foreach (var coll in collections) {
458                                 if (coll.TryTake (out item, millisecondsTimeout))
459                                         return index;
460                                 index++;
461                         }
462                         return -1;
463                 }
464
465                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
466                                                   CancellationToken token)
467                 {
468                         item = default (T);
469
470                         CheckArray (collections);
471                         int index = 0;
472                         foreach (var coll in collections) {
473                                 if (coll.TryTake (out item, millisecondsTimeout, token))
474                                         return index;
475                                 index++;
476                         }
477                         return -1;
478                 }
479                 #endregion
480
481                 public void CompleteAdding ()
482                 {
483                         // No further add beside that point
484                         completeId = addId;
485                         isComplete.Value = true;
486                         // Wakeup some operation in case this has an impact
487                         mreAdd.Set ();
488                         mreRemove.Set ();
489                 }
490
491                 void ThrowCompleteException ()
492                 {
493                         throw new InvalidOperationException ("The BlockingCollection<T> has"
494                                                              + " been marked as complete with regards to additions.");
495                 }
496
497                 void ICollection.CopyTo (Array array, int index)
498                 {
499                         underlyingColl.CopyTo (array, index);
500                 }
501
502                 public void CopyTo (T[] array, int index)
503                 {
504                         underlyingColl.CopyTo (array, index);
505                 }
506
507                 public IEnumerable<T> GetConsumingEnumerable ()
508                 {
509                         return GetConsumingEnumerable (Take);
510                 }
511
512                 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
513                 {
514                         return GetConsumingEnumerable (() => Take (token));
515                 }
516
517                 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
518                 {
519                         while (true) {
520                                 T item = default (T);
521
522                                 try {
523                                         item = getFunc ();
524                                 } catch {
525                                         break;
526                                 }
527
528                                 yield return item;
529                         }
530                 }
531
532                 IEnumerator IEnumerable.GetEnumerator ()
533                 {
534                         return ((IEnumerable)underlyingColl).GetEnumerator ();
535                 }
536
537                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
538                 {
539                         return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
540                 }
541
542                 public void Dispose ()
543                 {
544
545                 }
546
547                 protected virtual void Dispose (bool managedRes)
548                 {
549
550                 }
551
552                 public T[] ToArray ()
553                 {
554                         return underlyingColl.ToArray ();
555                 }
556
557                 public int BoundedCapacity {
558                         get {
559                                 return upperBound;
560                         }
561                 }
562
563                 public int Count {
564                         get {
565                                 return underlyingColl.Count;
566                         }
567                 }
568
569                 public bool IsAddingCompleted {
570                         get {
571                                 return isComplete.Value;
572                         }
573                 }
574
575                 public bool IsCompleted {
576                         get {
577                                 return isComplete.Value && addId == removeId;
578                         }
579                 }
580
581                 object ICollection.SyncRoot {
582                         get {
583                                 return underlyingColl.SyncRoot;
584                         }
585                 }
586
587                 bool ICollection.IsSynchronized {
588                         get {
589                                 return underlyingColl.IsSynchronized;
590                         }
591                 }
592         }
593 }
594 #endif