2010-06-29 Marek Safar <marek.safar@gmail.com>
[mono.git] / mcs / class / System / System.Collections.Concurrent / ConcurrentBag.cs
1 // 
2 // ConcurrentBag.cs
3 //  
4 // Author:
5 //       Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 // 
7 // Copyright (c) 2009 Jérémie "Garuma" Laval
8 // 
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 // 
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 // 
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
26
27 #if NET_4_0
28 using System;
29 using System.Collections;
30 using System.Collections.Generic;
31 using System.Diagnostics;
32 using System.Runtime.InteropServices;
33
34 using System.Threading;
35 using System.Threading.Tasks;
36
37 namespace System.Collections.Concurrent
38 {
39         [ComVisible (false)]
40         [DebuggerDisplay ("Count={Count}")]
41         [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
42         public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IEnumerable<T>, IEnumerable
43         {
44                 const int multiplier = 2;
45                 const int hintThreshold = 20;
46                 
47                 int size = Environment.ProcessorCount + 1;
48                 int count;
49                 
50                 // We only use the add hints when number of slot is above hintThreshold
51                 // so to not waste memory space and the CAS overhead
52                 ConcurrentQueue<int> addHints = new ConcurrentQueue<int> ();
53                 
54                 CyclicDeque<T>[] container;
55                 
56                 object syncLock = new object ();
57                 
58                 public ConcurrentBag ()
59                 {
60                         container = new CyclicDeque<T>[size];
61                         for (int i = 0; i < container.Length; i++)
62                                 container[i] = new CyclicDeque<T> ();
63                 }
64                 
65                 public ConcurrentBag (IEnumerable<T> enumerable) : this ()
66                 {
67                         foreach (T item in enumerable)
68                                 Add (item);
69                 }
70                 
71                 public bool TryAdd (T item)
72                 {
73                         Add (item);
74                         
75                         return true;
76                 }
77                 
78                 public void Add (T item)
79                 {
80                         Interlocked.Increment (ref count);
81                         GrowIfNecessary ();
82                         
83                         int index;
84                         CyclicDeque<T> bag = GetBag (out index);
85                         bag.PushBottom (item);
86                         
87                         // Cache operation ?
88                         if (size > hintThreshold)
89                                 addHints.Enqueue (index);
90                 }
91                 
92                 public bool TryTake (out T item)
93                 {
94                         item = default (T);
95
96                         if (count == 0)
97                                 return false;
98
99                         int hintIndex;
100                         CyclicDeque<T> bag = GetBag (out hintIndex);
101                         bool hintEnabled = size > hintThreshold;
102                         
103                         if (bag == null || bag.PopBottom (out item) != PopResult.Succeed) {
104                                 for (int i = 0; i < container.Length; i++) {
105                                         // Try to retrieve something based on a hint
106                                         bool result = hintEnabled && addHints.TryDequeue (out hintIndex) && container[hintIndex].PopTop (out item) == PopResult.Succeed;
107
108                                         // We fall back to testing our slot
109                                         if (!result && container[i] != null)
110                                                 result = container[i].PopTop (out item) == PopResult.Succeed;
111                                         
112                                         // If we found something, stop
113                                         if (result) {
114                                                 Interlocked.Decrement (ref count);
115                                                 return true;
116                                         }
117                                 }
118                         } else {
119                                 Interlocked.Decrement (ref count);
120                                 return true;
121                         }
122                         
123                         return false;
124                 }
125                 
126                 public int Count {
127                         get {
128                                 return count;
129                         }
130                 }
131                 
132                 public bool IsEmpty {
133                         get {
134                                 return count == 0;
135                         }
136                 }
137                 
138                 object System.Collections.ICollection.SyncRoot  {
139                         get {
140                                 return this;
141                         }
142                 }
143                 
144                 bool System.Collections.ICollection.IsSynchronized  {
145                         get {
146                                 return true;
147                         }
148                 }
149                 
150                 IEnumerator IEnumerable.GetEnumerator ()
151                 {
152                         return GetEnumeratorInternal ();
153                 }
154                 
155                 IEnumerator<T> IEnumerable<T>.GetEnumerator ()
156                 {
157                         return GetEnumeratorInternal ();
158                 }
159                 
160                 IEnumerator<T> GetEnumeratorInternal ()
161                 {
162                         for (int i = 0; i < size; i++) {
163                                 CyclicDeque<T> bag = container[i];
164                                 foreach (T item in bag.GetEnumerable ()) {
165                                         yield return item;
166                                 }
167                         }
168                 }
169                 
170                 void System.Collections.ICollection.CopyTo (Array array, int index)
171                 {
172                         T[] a = array as T[];
173                         if (a == null)
174                                 return;
175                         
176                         CopyTo (a, index);
177                 }
178                 
179                 public void CopyTo (T[] array, int index)
180                 {
181                         int c = count;
182                         if (array.Length < c + index)
183                                 throw new InvalidOperationException ("Array is not big enough");
184                         
185                         CopyTo (array, index, c);
186                 }
187                 
188                 void CopyTo (T[] array, int index, int num)
189                 {
190                         int i = index;
191                         
192                         foreach (T item in this) {
193                                 if (i >= num)
194                                         break;
195                                 
196                                 array[i++] = item;
197                         }
198                 }
199                 
200                 public T[] ToArray ()
201                 {
202                         int c = count;
203                         T[] temp = new T[c];
204                         
205                         CopyTo (temp, 0, c);
206                         
207                         return temp;
208                 }
209                         
210                 int GetIndex ()
211                 {
212                         return Thread.CurrentThread.ManagedThreadId - 1;
213                 }
214                 
215                 void GrowIfNecessary ()
216                 {
217                         int index = GetIndex ();
218                         int currentSize = size;
219                         
220                         while (index > currentSize - 1) {
221                                 currentSize = size;
222                                 Grow (currentSize);
223                         }
224                 }
225                 
226                 CyclicDeque<T> GetBag (out int i)
227                 {                       
228                         i = GetIndex ();
229                         
230                         return i < container.Length ? (container[i] == null) ? (container[i] = new CyclicDeque<T> ()) : container[i] : null;
231                 }
232                 
233                 void Grow (int referenceSize)
234                 {
235                         lock (syncLock) {
236                                 if (referenceSize != size)
237                                         return;
238                                 
239                                 CyclicDeque<T>[] slice = new CyclicDeque<T>[size * multiplier];
240                                 int i = 0;
241                                 Array.Copy (container, slice, container.Length);
242                                 
243                                 container = slice;
244                                 size = slice.Length;
245                         }
246                 }
247         }
248 }
249 #endif