Merge branch 'master' of github.com:mono/mono
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * Copyright 2001-2003 Ximian, Inc
3  * Copyright 2003-2010 Novell, Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24
25 typedef struct _WorkerData WorkerData;
26 struct _WorkerData {
27         pthread_t thread;
28         MonoSemType start_worker_sem;
29         gboolean is_working;
30         GrayQueue private_gray_queue; /* only read/written by worker thread */
31         int shared_buffer_increment;
32         int shared_buffer_index;
33 };
34
35 static int workers_num;
36 static WorkerData *workers_data;
37 static WorkerData workers_gc_thread_data;
38
39 static int workers_num_working;
40
41 static GrayQueue workers_distribute_gray_queue;
42
43 #define WORKERS_DISTRIBUTE_GRAY_QUEUE (major.is_parallel ? &workers_distribute_gray_queue : &gray_queue)
44
45 /*
46  * Must be a power of 2.  It seems that larger values don't help much.
47  * The main reason to make this larger would be to sustain a bigger
48  * number of worker threads.
49  */
50 #define WORKERS_SHARED_BUFFER_SIZE      16
51 static GrayQueueSection *workers_shared_buffer [WORKERS_SHARED_BUFFER_SIZE];
52 static int workers_shared_buffer_used;
53
54 static const int workers_primes [] = { 3, 5, 7, 11, 13, 17, 23, 29 };
55
56 static MonoSemType workers_done_sem;
57
58 static long long stat_shared_buffer_insert_tries;
59 static long long stat_shared_buffer_insert_full;
60 static long long stat_shared_buffer_insert_iterations;
61 static long long stat_shared_buffer_insert_failures;
62 static long long stat_shared_buffer_remove_tries;
63 static long long stat_shared_buffer_remove_iterations;
64 static long long stat_shared_buffer_remove_empty;
65
66 static void
67 workers_gray_queue_share_redirect (GrayQueue *queue)
68 {
69         GrayQueueSection *section;
70         WorkerData *data = queue->alloc_prepare_data;
71         int increment = data->shared_buffer_increment;
72
73         while ((section = gray_object_dequeue_section (queue))) {
74                 int i, index;
75
76                 HEAVY_STAT (++stat_shared_buffer_insert_tries);
77
78                 if (workers_shared_buffer_used == WORKERS_SHARED_BUFFER_SIZE) {
79                         HEAVY_STAT (++stat_shared_buffer_insert_full);
80                         gray_object_enqueue_section (queue, section);
81                         return;
82                 }
83
84                 index = data->shared_buffer_index;
85                 for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
86                         GrayQueueSection *old = workers_shared_buffer [index];
87                         HEAVY_STAT (++stat_shared_buffer_insert_iterations);
88                         if (!old) {
89                                 if (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], section, NULL) == NULL) {
90                                         SGEN_ATOMIC_ADD (workers_shared_buffer_used, 1);
91                                         //g_print ("thread %d put section %d\n", data - workers_data, index);
92                                         break;
93                                 }
94                         }
95                         index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
96                 }
97                 data->shared_buffer_index = index;
98
99                 if (i == WORKERS_SHARED_BUFFER_SIZE) {
100                         /* unsuccessful */
101                         HEAVY_STAT (++stat_shared_buffer_insert_failures);
102                         gray_object_enqueue_section (queue, section);
103                         return;
104                 }
105         }
106 }
107
108 static gboolean
109 workers_get_work (WorkerData *data)
110 {
111         int i, index;
112         int increment = data->shared_buffer_increment;
113
114         HEAVY_STAT (++stat_shared_buffer_remove_tries);
115
116         index = data->shared_buffer_index;
117         for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
118                 GrayQueueSection *section;
119
120                 HEAVY_STAT (++stat_shared_buffer_remove_iterations);
121
122                 do {
123                         section = workers_shared_buffer [index];
124                         if (!section)
125                                 break;
126                 } while (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], NULL, section) != section);
127
128                 if (section) {
129                         SGEN_ATOMIC_ADD (workers_shared_buffer_used, -1);
130                         gray_object_enqueue_section (&data->private_gray_queue, section);
131                         data->shared_buffer_index = index;
132                         //g_print ("thread %d popped section %d\n", data - workers_data, index);
133                         return TRUE;
134                 }
135
136                 index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
137         }
138
139         HEAVY_STAT (++stat_shared_buffer_remove_empty);
140
141         data->shared_buffer_index = index;
142         return FALSE;
143 }
144
145 /* returns the new value */
146 static int
147 workers_change_num_working (int delta)
148 {
149         int old, new;
150
151         if (!major.is_parallel)
152                 return -1;
153
154         do {
155                 old = workers_num_working;
156                 new = old + delta;
157         } while (InterlockedCompareExchange (&workers_num_working, new, old) != old);
158         return new;
159 }
160
161 static void*
162 workers_thread_func (void *data_untyped)
163 {
164         WorkerData *data = data_untyped;
165         SgenInternalAllocator allocator;
166
167         memset (&allocator, 0, sizeof (allocator));
168
169         gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue, &allocator,
170                         workers_gray_queue_share_redirect, data);
171
172         for (;;) {
173                 //g_print ("worker waiting for start %d\n", data->start_worker_sem);
174
175                 MONO_SEM_WAIT (&data->start_worker_sem);
176
177                 //g_print ("worker starting\n");
178
179                 for (;;) {
180                         do {
181                                 drain_gray_stack (&data->private_gray_queue);
182                         } while (workers_get_work (data));
183
184                         /*
185                          * FIXME: This might never terminate with
186                          * multiple threads!
187                          */
188
189                         if (workers_change_num_working (-1) == 0)
190                                 break;
191
192                         /* we weren't the last one working */
193                         //g_print ("sleeping\n");
194                         usleep (5000);
195                         workers_change_num_working (1);
196                 }
197
198                 gray_object_queue_init (&data->private_gray_queue, &allocator);
199
200                 MONO_SEM_POST (&workers_done_sem);
201
202                 //g_print ("worker done\n");
203         }
204
205         /* dummy return to make compilers happy */
206         return NULL;
207 }
208
209 static void
210 workers_distribute_gray_queue_sections (void)
211 {
212         if (!major.is_parallel)
213                 return;
214
215         workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
216 }
217
218 static void
219 workers_init (int num_workers)
220 {
221         int i;
222
223         if (!major.is_parallel)
224                 return;
225
226         //g_print ("initing %d workers\n", num_workers);
227
228         workers_num = num_workers;
229         workers_data = mono_sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
230         MONO_SEM_INIT (&workers_done_sem, 0);
231         workers_gc_thread_data.shared_buffer_increment = 1;
232         workers_gc_thread_data.shared_buffer_index = 0;
233         gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue, mono_sgen_get_unmanaged_allocator (),
234                         workers_gray_queue_share_redirect, &workers_gc_thread_data);
235
236         g_assert (num_workers <= sizeof (workers_primes) / sizeof (workers_primes [0]));
237         for (i = 0; i < workers_num; ++i) {
238                 workers_data [i].shared_buffer_increment = workers_primes [i];
239                 workers_data [i].shared_buffer_index = 0;
240         }
241
242         mono_counters_register ("Shared buffer insert tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_tries);
243         mono_counters_register ("Shared buffer insert full", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_full);
244         mono_counters_register ("Shared buffer insert iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_iterations);
245         mono_counters_register ("Shared buffer insert failures", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_failures);
246         mono_counters_register ("Shared buffer remove tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_tries);
247         mono_counters_register ("Shared buffer remove iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_iterations);
248         mono_counters_register ("Shared buffer remove empty", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_empty);
249 }
250
251 /* only the GC thread is allowed to start and join workers */
252
253 static void
254 workers_start_worker (int index)
255 {
256         g_assert (index >= 0 && index < workers_num);
257
258         if (workers_data [index].is_working)
259                 return;
260
261         if (!workers_data [index].thread) {
262                 //g_print ("initing thread %d\n", index);
263                 MONO_SEM_INIT (&workers_data [index].start_worker_sem, 0);
264                 pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]);
265         }
266
267         workers_data [index].is_working = TRUE;
268         MONO_SEM_POST (&workers_data [index].start_worker_sem);
269         //g_print ("posted thread start %d %d\n", index, workers_data [index].start_worker_sem);
270 }
271
272 static void
273 workers_start_all_workers (int num_additional_workers)
274 {
275         int i;
276
277         if (!major.is_parallel)
278                 return;
279
280         g_assert (workers_num_working == 0);
281         workers_num_working = workers_num + num_additional_workers;
282
283         for (i = 0; i < workers_num; ++i)
284                 workers_start_worker (i);
285 }
286
287 static void
288 workers_join (void)
289 {
290         int i;
291
292         if (!major.is_parallel)
293                 return;
294
295         //g_print ("joining\n");
296         for (i = 0; i < workers_num; ++i) {
297                 if (workers_data [i].is_working)
298                         MONO_SEM_WAIT (&workers_done_sem);
299         }
300         for (i = 0; i < workers_num; ++i)
301                 workers_data [i].is_working = FALSE;
302         //g_print ("joined\n");
303
304         g_assert (workers_num_working == 0);
305         g_assert (workers_shared_buffer_used == 0);
306
307         for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i)
308                 g_assert (!workers_shared_buffer [i]);
309 }
310
311 gboolean
312 mono_sgen_is_worker_thread (pthread_t thread)
313 {
314         int i;
315
316         if (!major.is_parallel)
317                 return FALSE;
318
319         for (i = 0; i < workers_num; ++i) {
320                 if (workers_data [i].thread == thread)
321                         return TRUE;
322         }
323         return FALSE;
324 }