2010-03-12 Jb Evain <jbevain@novell.com>
[mono.git] / mcs / class / System / System.Collections.Concurrent / BlockingCollection.cs
1 //
2 // BlockingCollection.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 #if NET_4_0
27
28 using System;
29 using System.Threading;
30 using System.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Runtime.InteropServices;
34
35 namespace System.Collections.Concurrent
36 {
37         [ComVisible (false)]
38         [DebuggerDisplay ("Count={Count}")]
39         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
40         public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
41         {
42                 readonly IProducerConsumerCollection<T> underlyingColl;
43                 readonly int upperBound;
44                 
45                 readonly SpinWait sw = new SpinWait ();
46                 
47                 AtomicBoolean isComplete;
48                 long completeId;
49
50                 long addId = long.MinValue;
51                 long removeId = long.MinValue;
52                 
53                 #region ctors
54                 public BlockingCollection ()
55                         : this (new ConcurrentQueue<T> (), -1)
56                 {
57                 }
58                 
59                 public BlockingCollection (int upperBound)
60                         : this (new ConcurrentQueue<T> (), upperBound)
61                 {
62                 }
63                 
64                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
65                         : this (underlyingColl, -1)
66                 {
67                 }
68                 
69                 public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
70                 {
71                         this.underlyingColl = underlyingColl;
72                         this.upperBound     = upperBound;
73                         this.isComplete     = new AtomicBoolean ();
74                 }
75                 #endregion
76                 
77                 #region Add & Remove (+ Try)
78                 public void Add (T item)
79                 {
80                         Add (item, null);
81                 }
82                 
83                 public void Add (T item, CancellationToken token)
84                 {
85                         Add (item, () => token.IsCancellationRequested);
86                 }
87                 
88                 void Add (T item, Func<bool> cancellationFunc)
89                 {
90                         while (true) {
91                                 long cachedAddId = addId;
92                                 long cachedRemoveId = removeId;
93                                 
94                                 if (upperBound != -1) {
95                                         if (cachedAddId - cachedRemoveId > upperBound) {
96                                                 Block ();
97                                                 continue;
98                                         }
99                                 }
100                                 
101                                 // Check our transaction id against completed stored one
102                                 if (isComplete.Value && cachedAddId >= completeId)
103                                         throw new InvalidOperationException ("The BlockingCollection<T> has"
104                                                                              + " been marked as complete with regards to additions.");
105                                 
106                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
107                                         break;
108                                 
109                                 if (cancellationFunc != null && cancellationFunc ())
110                                         throw new OperationCanceledException ("CancellationToken triggered");
111                         }
112                         
113                         
114                         if (!underlyingColl.TryAdd (item))
115                                 throw new InvalidOperationException ("The underlying collection didn't accept the item.");
116                 }
117                 
118                 public T Take ()
119                 {
120                         return Take (null);
121                 }
122                 
123                 public T Take (CancellationToken token)
124                 {
125                         return Take (() => token.IsCancellationRequested);
126                 }
127                 
128                 T Take (Func<bool> cancellationFunc)
129                 {
130                         while (true) {
131                                 long cachedRemoveId = removeId;
132                                 long cachedAddId = addId;
133                                 
134                                 // Empty case
135                                 if (cachedRemoveId == cachedAddId) {
136                                         if (isComplete.Value && cachedRemoveId >= completeId)
137                                                 throw new OperationCanceledException ("The BlockingCollection<T> has"
138                                                                                       + " been marked as complete with regards to additions.");
139                                         
140                                         Block ();
141                                         continue;
142                                 }
143                                 
144                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
145                                         break;
146                                 
147                                 if (cancellationFunc != null && cancellationFunc ())
148                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
149                         }
150                         
151                         T item;
152                         while (!underlyingColl.TryTake (out item));
153                         
154                         return item;
155                 }
156                 
157                 public bool TryAdd (T item)
158                 {
159                         return TryAdd (item, null, null);
160                 }
161                 
162                 bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
163                 {
164                         do {
165                                 if (token.HasValue && token.Value.IsCancellationRequested)
166                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
167                                 
168                                 long cachedAddId = addId;
169                                 long cachedRemoveId = removeId;
170                                 
171                                 if (upperBound != -1) {
172                                         if (cachedAddId - cachedRemoveId > upperBound) {
173                                                 continue;
174                                         }
175                                 }
176                                 
177                                 // Check our transaction id against completed stored one
178                                 if (isComplete.Value && cachedAddId >= completeId)
179                                         throw new InvalidOperationException ("The BlockingCollection<T> has"
180                                                                              + " been marked as complete with regards to additions.");
181                                 
182                                 if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
183                                         continue;
184                         
185                                 if (!underlyingColl.TryAdd (item))
186                                         throw new InvalidOperationException ("The underlying collection didn't accept the item.");
187                                 
188                                 return true;
189                         } while (contFunc != null && contFunc ());
190                         
191                         return false;
192                 }
193                 
194                 public bool TryAdd (T item, TimeSpan ts)
195                 {
196                         return TryAdd (item, (int)ts.TotalMilliseconds);
197                 }
198                 
199                 public bool TryAdd (T item, int millisecondsTimeout)
200                 {
201                         Stopwatch sw = Stopwatch.StartNew ();
202                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
203                 }
204                 
205                 public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
206                 {
207                         Stopwatch sw = Stopwatch.StartNew ();
208                         return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
209                 }
210                 
211                 public bool TryTake (out T item)
212                 {
213                         return TryTake (out item, null, null);
214                 }
215                 
216                 bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
217                 {
218                         item = default (T);
219                         
220                         do {
221                                 if (token.HasValue && token.Value.IsCancellationRequested)
222                                         throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
223                                 
224                                 long cachedRemoveId = removeId;
225                                 long cachedAddId = addId;
226                                 
227                                 // Empty case
228                                 if (cachedRemoveId == cachedAddId) {
229                                         if (isComplete.Value && cachedRemoveId >= completeId)
230                                                 continue;
231                                         
232                                         continue;
233                                 }
234                                 
235                                 if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
236                                         continue;
237                                 
238                                 return underlyingColl.TryTake (out item);
239                         } while (contFunc != null && contFunc ());
240                         
241                         return false;
242                 }
243                 
244                 public bool TryTake (out T item, TimeSpan ts)
245                 {
246                         return TryTake (out item, (int)ts.TotalMilliseconds);
247                 }
248                 
249                 public bool TryTake (out T item, int millisecondsTimeout)
250                 {
251                         item = default (T);
252                         Stopwatch sw = Stopwatch.StartNew ();
253                         
254                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
255                 }
256                 
257                 public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
258                 {
259                         item = default (T);
260                         Stopwatch sw = Stopwatch.StartNew ();
261                         
262                         return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
263                 }
264                 #endregion
265                 
266                 #region static methods
267                 static void CheckArray (BlockingCollection<T>[] collections)
268                 {
269                         if (collections == null)
270                                 throw new ArgumentNullException ("collections");
271                         if (collections.Length == 0 || IsThereANullElement (collections))
272                                 throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
273                 }
274                 
275                 static bool IsThereANullElement (BlockingCollection<T>[] collections)
276                 {
277                         foreach (BlockingCollection<T> e in collections)
278                                 if (e == null)
279                                         return true;
280                         return false;
281                 }
282                 
283                 public static int AddToAny (BlockingCollection<T>[] collections, T item)
284                 {
285                         CheckArray (collections);
286                         int index = 0;
287                         foreach (var coll in collections) {
288                                 try {
289                                         coll.Add (item);
290                                         return index;
291                                 } catch {}
292                                 index++;
293                         }
294                         return -1;
295                 }
296                 
297                 public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
298                 {
299                         CheckArray (collections);
300                         int index = 0;
301                         foreach (var coll in collections) {
302                                 try {
303                                         coll.Add (item, token);
304                                         return index;
305                                 } catch {}
306                                 index++;
307                         }
308                         return -1;
309                 }
310                 
311                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
312                 {
313                         CheckArray (collections);
314                         int index = 0;
315                         foreach (var coll in collections) {
316                                 if (coll.TryAdd (item))
317                                         return index;
318                                 index++;
319                         }
320                         return -1;
321                 }
322                 
323                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
324                 {
325                         CheckArray (collections);
326                         int index = 0;
327                         foreach (var coll in collections) {
328                                 if (coll.TryAdd (item, ts))
329                                         return index;
330                                 index++;
331                         }
332                         return -1;
333                 }
334                 
335                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
336                 {
337                         CheckArray (collections);
338                         int index = 0;
339                         foreach (var coll in collections) {
340                                 if (coll.TryAdd (item, millisecondsTimeout))
341                                         return index;
342                                 index++;
343                         }
344                         return -1;
345                 }
346                 
347                 public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
348                                                CancellationToken token)
349                 {
350                         CheckArray (collections);
351                         int index = 0;
352                         foreach (var coll in collections) {
353                                 if (coll.TryAdd (item, millisecondsTimeout, token))
354                                         return index;
355                                 index++;
356                         }
357                         return -1;
358                 }
359                 
360                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
361                 {
362                         item = default (T);
363                         CheckArray (collections);
364                         int index = 0;
365                         foreach (var coll in collections) {
366                                 try {
367                                         item = coll.Take ();
368                                         return index;
369                                 } catch {}
370                                 index++;
371                         }
372                         return -1;
373                 }
374                 
375                 public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
376                 {
377                         item = default (T);
378                         CheckArray (collections);
379                         int index = 0;
380                         foreach (var coll in collections) {
381                                 try {
382                                         item = coll.Take (token);
383                                         return index;
384                                 } catch {}
385                                 index++;
386                         }
387                         return -1;
388                 }
389                 
390                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
391                 {
392                         item = default (T);
393                         
394                         CheckArray (collections);
395                         int index = 0;
396                         foreach (var coll in collections) {
397                                 if (coll.TryTake (out item))
398                                         return index;
399                                 index++;
400                         }
401                         return -1;
402                 }
403                 
404                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
405                 {
406                         item = default (T);
407                         
408                         CheckArray (collections);
409                         int index = 0;
410                         foreach (var coll in collections) {
411                                 if (coll.TryTake (out item, ts))
412                                         return index;
413                                 index++;
414                         }
415                         return -1;
416                 }
417                 
418                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
419                 {
420                         item = default (T);
421                         
422                         CheckArray (collections);
423                         int index = 0;
424                         foreach (var coll in collections) {
425                                 if (coll.TryTake (out item, millisecondsTimeout))
426                                         return index;
427                                 index++;
428                         }
429                         return -1;
430                 }
431                 
432                 public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
433                                                   CancellationToken token)
434                 {
435                         item = default (T);
436                         
437                         CheckArray (collections);
438                         int index = 0;
439                         foreach (var coll in collections) {
440                                 if (coll.TryTake (out item, millisecondsTimeout, token))
441                                         return index;
442                                 index++;
443                         }
444                         return -1;
445                 }
446                 #endregion
447                 
448                 public void CompleteAdding ()
449                 {
450                   // No further add beside that point
451                   completeId = addId;
452                   isComplete.Value = true;
453                 }
454                 
455                 void ICollection.CopyTo (Array array, int index)
456                 {
457                         underlyingColl.CopyTo (array, index);
458                 }
459                 
460                 public void CopyTo (T[] array, int index)
461                 {
462                         underlyingColl.CopyTo (array, index);
463                 }
464                 
465                 public IEnumerable<T> GetConsumingEnumerable ()
466                 {
467                         return GetConsumingEnumerable (Take);
468                 }
469                 
470                 public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
471                 {
472                         return GetConsumingEnumerable (() => Take (token));
473                 }
474                 
475                 IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
476                 {
477                         while (true) {
478                                 T item = default (T);
479                                 
480                                 try {
481                                         item = getFunc ();
482                                 } catch {
483                                         break;
484                                 }
485                                 
486                                 yield return item;
487                         }
488                 }
489                 
490                 IEnumerator IEnumerable.GetEnumerator ()
491                 {
492                         return ((IEnumerable)underlyingColl).GetEnumerator ();
493                 }
494                 
495                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
496                 {
497                         return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
498                 }
499                 
500                 public void Dispose ()
501                 {
502                         
503                 }
504                 
505                 protected virtual void Dispose (bool managedRes)
506                 {
507                         
508                 }
509                 
510                 public T[] ToArray ()
511                 {
512                         return underlyingColl.ToArray ();
513                 }
514                 
515                 // Method used to stall the thread for a limited period of time before retrying an operation
516                 void Block ()
517                 {
518                         sw.SpinOnce ();
519                 }
520                 
521                 public int BoundedCapacity {
522                         get {
523                                 return upperBound;
524                         }
525                 }
526                 
527                 public int Count {
528                         get {
529                                 return underlyingColl.Count;
530                         }
531                 }
532                 
533                 public bool IsAddingCompleted {
534                         get {
535                                 return isComplete.Value;
536                         }
537                 }
538                 
539                 public bool IsCompleted {
540                         get {
541                                 return isComplete.Value && addId == removeId;
542                         }
543                 }
544                 
545                 object ICollection.SyncRoot {
546                         get {
547                                 return underlyingColl.SyncRoot;
548                         }
549                 }
550                 
551                 bool ICollection.IsSynchronized {
552                         get {
553                                 return underlyingColl.IsSynchronized;
554                         }
555                 }
556         }
557 }
558 #endif