2010-04-15 Jérémie Laval <jeremie.laval@gmail.com>
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 #if NET_4_0 || BOOTSTRAP_NET_4_0
27
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34
35 namespace System.Collections.Concurrent
36 {
37         [ComVisible (false)]
38         [DebuggerDisplay ("Count={Count}")]
39         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40         public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41         {
42                 readonly IProducerConsumerCollection<T> underlyingColl;
43                 readonly int upperBound;
44
45                 AtomicBoolean isComplete;
46                 long completeId;
47
48                 long addId = long.MinValue;
49                 long removeId = long.MinValue;
50
51                 #region ctors
52                 public BlockingCollection ()
53                         : this (new ConcurrentQueue<T> (), -1)
54                 {
55                 }
56
57                 public BlockingCollection (int upperBound)
58                         : this (new ConcurrentQueue<T> (), upperBound)
59                 {
60                 }
61
62                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
63                         : this (underlyingColl, -1)
64                 {
65                 }
66
67                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
68                 {
69                         this.underlyingColl = underlyingColl;
70                         this.upperBound     = upperBound;
71                         this.isComplete     = new AtomicBoolean ();
72                 }
73                 #endregion
74
75                 #region Add & Remove (+ Try)
76                 public void Add (T item)
77                 {
78                         Add (item, null);
79                 }
80
81                 public void Add (T item, CancellationToken token)
82                 {
83                         Add (item, () => token.IsCancellationRequested);
84                 }
85
86                 void Add (T item, Func<bool> cancellationFunc)
87                 {
88                         while (true) {
89                                 long cachedAddId = addId;
90                                 long cachedRemoveId = removeId;
91
92                                 if (upperBound != -1) {
93                                         if (cachedAddId - cachedRemoveId > upperBound) {
94                                                 Block ();
95                                                 continue;
96                                         }
97                                 }
98
99                                 // Check our transaction id against completed stored one
100                                 if (isComplete.Value && cachedAddId >= completeId)
101                                         throw new InvalidOperationException ("The BlockingCollection<T> has"
102                                                                              + " been marked as complete with regards to additions.");
103
104                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
105                                         break;
106
107                                 if (cancellationFunc != null && cancellationFunc ())
108                                         throw new OperationCanceledException ("CancellationToken triggered");
109                         }
110
111
112                         if (!underlyingColl.TryAdd (item))
113                                 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
114                 }
115
116                 public T Take ()
117                 {
118                         return Take (null);
119                 }
120
121                 public T Take (CancellationToken token)
122                 {
123                         return Take (() => token.IsCancellationRequested);
124                 }
125
126                 T Take (Func<bool> cancellationFunc)
127                 {
128                         while (true) {
129                                 long cachedRemoveId = removeId;
130                                 long cachedAddId = addId;
131
132                                 // Empty case
133                                 if (cachedRemoveId == cachedAddId) {
134                                         if (IsCompleted)
135                                                 throw new OperationCanceledException ("The BlockingCollection<T> has"
136                                                                                       + " been marked as complete with regards to additions.");
137
138                                         Block ();
139                                         continue;
140                                 }
141
142                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
143                                         break;
144
145                                 if (cancellationFunc != null && cancellationFunc ())
146                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
147                         }
148
149                         T item;
150                         while (!underlyingColl.TryTake (out item));
151
152                         return item;
153                 }
154
155                 public bool TryAdd (T item)
156                 {
157                         return TryAdd (item, null, null);
158                 }
159
160                 bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
161                 {
162                         do {
163                                 if (token.HasValue && token.Value.IsCancellationRequested)
164                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
165
166                                 long cachedAddId = addId;
167                                 long cachedRemoveId = removeId;
168
169                                 if (upperBound != -1) {
170                                         if (cachedAddId - cachedRemoveId > upperBound) {
171                                                 continue;
172                                         }
173                                 }
174
175                                 // Check our transaction id against completed stored one
176                                 if (isComplete.Value && cachedAddId >= completeId)
177                                         throw new InvalidOperationException ("The BlockingCollection<T> has"
178                                                                              + " been marked as complete with regards to additions.");
179
180                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
181                                         continue;
182
183                                 if (!underlyingColl.TryAdd (item))
184                                         throw new InvalidOperationException ("The underlying collection didn't accept the item.");
185
186                                 return true;
187                         } while (contFunc != null && contFunc ());
188
189                         return false;
190                 }
191
192                 public bool TryAdd (T item, TimeSpan ts)
193                 {
194                         return TryAdd (item, (int)ts.TotalMilliseconds);
195                 }
196
197                 public bool TryAdd (T item, int millisecondsTimeout)
198                 {
199                         Stopwatch sw = Stopwatch.StartNew ();
200                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
201                 }
202
203                 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
204                 {
205                         Stopwatch sw = Stopwatch.StartNew ();
206                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
207                 }
208
209                 public bool TryTake (out T item)
210                 {
211                         return TryTake (out item, null, null);
212                 }
213
214                 bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
215                 {
216                         item = default (T);
217
218                         do {
219                                 if (token.HasValue && token.Value.IsCancellationRequested)
220                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
221
222                                 long cachedRemoveId = removeId;
223                                 long cachedAddId = addId;
224
225                                 // Empty case
226                                 if (cachedRemoveId == cachedAddId) {
227                                         if (IsCompleted)
228                                                 return false;
229
230                                         continue;
231                                 }
232
233                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
234                                         continue;
235
236                                 return underlyingColl.TryTake (out item);
237                         } while (contFunc != null && contFunc ());
238
239                         return false;
240                 }
241
242                 public bool TryTake (out T item, TimeSpan ts)
243                 {
244                         return TryTake (out item, (int)ts.TotalMilliseconds);
245                 }
246
247                 public bool TryTake (out T item, int millisecondsTimeout)
248                 {
249                         item = default (T);
250                         Stopwatch sw = Stopwatch.StartNew ();
251
252                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
253                 }
254
255                 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
256                 {
257                         item = default (T);
258                         Stopwatch sw = Stopwatch.StartNew ();
259
260                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
261                 }
262                 #endregion
263
264                 #region static methods
265                 static void CheckArray (BlockingCollection<T>[] collections)
266                 {
267                         if (collections == null)
268                                 throw new ArgumentNullException ("collections");
269                         if (collections.Length == 0 || IsThereANullElement (collections))
270                                 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
271                 }
272
273                 static bool IsThereANullElement (BlockingCollection<T>[] collections)
274                 {
275                         foreach (BlockingCollection<T> e in collections)
276                                 if (e == null)
277                                         return true;
278                         return false;
279                 }
280
281                 public static int AddToAny (BlockingCollection<T>[] collections, T item)
282                 {
283                         CheckArray (collections);
284                         int index = 0;
285                         foreach (var coll in collections) {
286                                 try {
287                                         coll.Add (item);
288                                         return index;
289                                 } catch {}
290                                 index++;
291                         }
292                         return -1;
293                 }
294
295                 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
296                 {
297                         CheckArray (collections);
298                         int index = 0;
299                         foreach (var coll in collections) {
300                                 try {
301                                         coll.Add (item, token);
302                                         return index;
303                                 } catch {}
304                                 index++;
305                         }
306                         return -1;
307                 }
308
309                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
310                 {
311                         CheckArray (collections);
312                         int index = 0;
313                         foreach (var coll in collections) {
314                                 if (coll.TryAdd (item))
315                                         return index;
316                                 index++;
317                         }
318                         return -1;
319                 }
320
321                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
322                 {
323                         CheckArray (collections);
324                         int index = 0;
325                         foreach (var coll in collections) {
326                                 if (coll.TryAdd (item, ts))
327                                         return index;
328                                 index++;
329                         }
330                         return -1;
331                 }
332
333                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
334                 {
335                         CheckArray (collections);
336                         int index = 0;
337                         foreach (var coll in collections) {
338                                 if (coll.TryAdd (item, millisecondsTimeout))
339                                         return index;
340                                 index++;
341                         }
342                         return -1;
343                 }
344
345                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
346                                                CancellationToken token)
347                 {
348                         CheckArray (collections);
349                         int index = 0;
350                         foreach (var coll in collections) {
351                                 if (coll.TryAdd (item, millisecondsTimeout, token))
352                                         return index;
353                                 index++;
354                         }
355                         return -1;
356                 }
357
358                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
359                 {
360                         item = default (T);
361                         CheckArray (collections);
362                         int index = 0;
363                         foreach (var coll in collections) {
364                                 try {
365                                         item = coll.Take ();
366                                         return index;
367                                 } catch {}
368                                 index++;
369                         }
370                         return -1;
371                 }
372
373                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
374                 {
375                         item = default (T);
376                         CheckArray (collections);
377                         int index = 0;
378                         foreach (var coll in collections) {
379                                 try {
380                                         item = coll.Take (token);
381                                         return index;
382                                 } catch {}
383                                 index++;
384                         }
385                         return -1;
386                 }
387
388                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
389                 {
390                         item = default (T);
391
392                         CheckArray (collections);
393                         int index = 0;
394                         foreach (var coll in collections) {
395                                 if (coll.TryTake (out item))
396                                         return index;
397                                 index++;
398                         }
399                         return -1;
400                 }
401
402                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
403                 {
404                         item = default (T);
405
406                         CheckArray (collections);
407                         int index = 0;
408                         foreach (var coll in collections) {
409                                 if (coll.TryTake (out item, ts))
410                                         return index;
411                                 index++;
412                         }
413                         return -1;
414                 }
415
416                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
417                 {
418                         item = default (T);
419
420                         CheckArray (collections);
421                         int index = 0;
422                         foreach (var coll in collections) {
423                                 if (coll.TryTake (out item, millisecondsTimeout))
424                                         return index;
425                                 index++;
426                         }
427                         return -1;
428                 }
429
430                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
431                                                   CancellationToken token)
432                 {
433                         item = default (T);
434
435                         CheckArray (collections);
436                         int index = 0;
437                         foreach (var coll in collections) {
438                                 if (coll.TryTake (out item, millisecondsTimeout, token))
439                                         return index;
440                                 index++;
441                         }
442                         return -1;
443                 }
444                 #endregion
445
446                 public void CompleteAdding ()
447                 {
448                   // No further add beside that point
449                   completeId = addId;
450                   isComplete.Value = true;
451                 }
452
453                 void ICollection.CopyTo (Array array, int index)
454                 {
455                         underlyingColl.CopyTo (array, index);
456                 }
457
458                 public void CopyTo (T[] array, int index)
459                 {
460                         underlyingColl.CopyTo (array, index);
461                 }
462
463                 public IEnumerable<T> GetConsumingEnumerable ()
464                 {
465                         return GetConsumingEnumerable (Take);
466                 }
467
468                 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
469                 {
470                         return GetConsumingEnumerable (() => Take (token));
471                 }
472
473                 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
474                 {
475                         while (true) {
476                                 T item = default (T);
477
478                                 try {
479                                         item = getFunc ();
480                                 } catch {
481                                         break;
482                                 }
483
484                                 yield return item;
485                         }
486                 }
487
488                 IEnumerator IEnumerable.GetEnumerator ()
489                 {
490                         return ((IEnumerable)underlyingColl).GetEnumerator ();
491                 }
492
493                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
494                 {
495                         return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
496                 }
497
498                 public void Dispose ()
499                 {
500
501                 }
502
503                 protected virtual void Dispose (bool managedRes)
504                 {
505
506                 }
507
508                 public T[] ToArray ()
509                 {
510                         return underlyingColl.ToArray ();
511                 }
512                 
513                 [ThreadStatic]
514                 SpinWait sw;
515
516                 // Method used to stall the thread for a limited period of time before retrying an operation
517                 void Block ()
518                 {
519                         sw.SpinOnce ();
520                 }
521
522                 public int BoundedCapacity {
523                         get {
524                                 return upperBound;
525                         }
526                 }
527
528                 public int Count {
529                         get {
530                                 return underlyingColl.Count;
531                         }
532                 }
533
534                 public bool IsAddingCompleted {
535                         get {
536                                 return isComplete.Value;
537                         }
538                 }
539
540                 public bool IsCompleted {
541                         get {
542                                 return isComplete.Value && addId == removeId;
543                         }
544                 }
545
546                 object ICollection.SyncRoot {
547                         get {
548                                 return underlyingColl.SyncRoot;
549                         }
550                 }
551
552                 bool ICollection.IsSynchronized {
553                         get {
554                                 return underlyingColl.IsSynchronized;
555                         }
556                 }
557         }
558 }
559 #endif