[sgen] Make parallel vs non-parallel mark&sweep selectable.
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * Copyright 2001-2003 Ximian, Inc
3  * Copyright 2003-2010 Novell, Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24
25 typedef struct _WorkerData WorkerData;
26 struct _WorkerData {
27         pthread_t thread;
28         MonoSemType start_worker_sem;
29         gboolean is_working;
30         GrayQueue private_gray_queue; /* only read/written by worker thread */
31         int shared_buffer_increment;
32         int shared_buffer_index;
33 };
34
35 static int workers_num;
36 static WorkerData *workers_data;
37 static WorkerData workers_gc_thread_data;
38
39 static int workers_num_working;
40
41 static GrayQueue workers_distribute_gray_queue;
42
43 #define WORKERS_DISTRIBUTE_GRAY_QUEUE (major.is_parallel ? &workers_distribute_gray_queue : &gray_queue)
44
45 /*
46  * Must be a power of 2.  It seems that larger values don't help much.
47  * The main reason to make this larger would be to sustain a bigger
48  * number of worker threads.
49  */
50 #define WORKERS_SHARED_BUFFER_SIZE      16
51 static GrayQueueSection *workers_shared_buffer [WORKERS_SHARED_BUFFER_SIZE];
52 static int workers_shared_buffer_used;
53
54 static const int workers_primes [] = { 3, 5, 7, 11, 13, 17, 23, 29 };
55
56 static MonoSemType workers_done_sem;
57
58 static long long stat_shared_buffer_insert_tries;
59 static long long stat_shared_buffer_insert_full;
60 static long long stat_shared_buffer_insert_iterations;
61 static long long stat_shared_buffer_insert_failures;
62 static long long stat_shared_buffer_remove_tries;
63 static long long stat_shared_buffer_remove_iterations;
64 static long long stat_shared_buffer_remove_empty;
65
66 static void
67 workers_gray_queue_share_redirect (GrayQueue *queue)
68 {
69         GrayQueueSection *section;
70         WorkerData *data = queue->alloc_prepare_data;
71         int increment = data->shared_buffer_increment;
72
73         while ((section = gray_object_dequeue_section (queue))) {
74                 int i, index;
75
76                 HEAVY_STAT (++stat_shared_buffer_insert_tries);
77
78                 if (workers_shared_buffer_used == WORKERS_SHARED_BUFFER_SIZE) {
79                         HEAVY_STAT (++stat_shared_buffer_insert_full);
80                         gray_object_enqueue_section (queue, section);
81                         return;
82                 }
83
84                 index = data->shared_buffer_index;
85                 for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
86                         GrayQueueSection *old = workers_shared_buffer [index];
87                         HEAVY_STAT (++stat_shared_buffer_insert_iterations);
88                         if (!old) {
89                                 if (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], section, NULL) == NULL) {
90                                         SGEN_ATOMIC_ADD (workers_shared_buffer_used, 1);
91                                         //g_print ("thread %d put section %d\n", data - workers_data, index);
92                                         break;
93                                 }
94                         }
95                         index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
96                 }
97                 data->shared_buffer_index = index;
98
99                 if (i == WORKERS_SHARED_BUFFER_SIZE) {
100                         /* unsuccessful */
101                         HEAVY_STAT (++stat_shared_buffer_insert_failures);
102                         gray_object_enqueue_section (queue, section);
103                         return;
104                 }
105         }
106 }
107
108 static gboolean
109 workers_get_work (WorkerData *data)
110 {
111         int i, index;
112         int increment = data->shared_buffer_increment;
113
114         HEAVY_STAT (++stat_shared_buffer_remove_tries);
115
116         index = data->shared_buffer_index;
117         for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
118                 GrayQueueSection *section;
119
120                 HEAVY_STAT (++stat_shared_buffer_remove_iterations);
121
122                 do {
123                         section = workers_shared_buffer [index];
124                         if (!section)
125                                 break;
126                 } while (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], NULL, section) != section);
127
128                 if (section) {
129                         SGEN_ATOMIC_ADD (workers_shared_buffer_used, -1);
130                         gray_object_enqueue_section (&data->private_gray_queue, section);
131                         data->shared_buffer_index = index;
132                         //g_print ("thread %d popped section %d\n", data - workers_data, index);
133                         return TRUE;
134                 }
135
136                 index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
137         }
138
139         HEAVY_STAT (++stat_shared_buffer_remove_empty);
140
141         data->shared_buffer_index = index;
142         return FALSE;
143 }
144
145 /* returns the new value */
146 static int
147 workers_change_num_working (int delta)
148 {
149         int old, new;
150
151         if (!major.is_parallel)
152                 return -1;
153
154         do {
155                 old = workers_num_working;
156                 new = old + delta;
157         } while (InterlockedCompareExchange (&workers_num_working, new, old) != old);
158         return new;
159 }
160
161 static void*
162 workers_thread_func (void *data_untyped)
163 {
164         WorkerData *data = data_untyped;
165         SgenInternalAllocator allocator;
166
167         memset (&allocator, 0, sizeof (allocator));
168
169         gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue, &allocator,
170                         workers_gray_queue_share_redirect, data);
171
172         for (;;) {
173                 //g_print ("worker waiting for start %d\n", data->start_worker_sem);
174
175                 MONO_SEM_WAIT (&data->start_worker_sem);
176
177                 //g_print ("worker starting\n");
178
179                 for (;;) {
180                         do {
181                                 drain_gray_stack (&data->private_gray_queue);
182                         } while (workers_get_work (data));
183
184                         /*
185                          * FIXME: This might never terminate with
186                          * multiple threads!
187                          */
188
189                         if (workers_change_num_working (-1) == 0)
190                                 break;
191
192                         /* we weren't the last one working */
193                         //g_print ("sleeping\n");
194                         usleep (5000);
195                         workers_change_num_working (1);
196                 }
197
198                 gray_object_queue_init (&data->private_gray_queue, &allocator);
199
200                 MONO_SEM_POST (&workers_done_sem);
201
202                 //g_print ("worker done\n");
203         }
204 }
205
206 static void
207 workers_distribute_gray_queue_sections (void)
208 {
209         if (!major.is_parallel)
210                 return;
211
212         workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
213 }
214
215 static void
216 workers_init (int num_workers)
217 {
218         int i;
219
220         if (!major.is_parallel)
221                 return;
222
223         //g_print ("initing %d workers\n", num_workers);
224
225         workers_num = num_workers;
226         workers_data = mono_sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
227         MONO_SEM_INIT (&workers_done_sem, 0);
228         workers_gc_thread_data.shared_buffer_increment = 1;
229         workers_gc_thread_data.shared_buffer_index = 0;
230         gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue, mono_sgen_get_unmanaged_allocator (),
231                         workers_gray_queue_share_redirect, &workers_gc_thread_data);
232
233         g_assert (num_workers <= sizeof (workers_primes) / sizeof (workers_primes [0]));
234         for (i = 0; i < workers_num; ++i) {
235                 workers_data [i].shared_buffer_increment = workers_primes [i];
236                 workers_data [i].shared_buffer_index = 0;
237         }
238
239         mono_counters_register ("Shared buffer insert tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_tries);
240         mono_counters_register ("Shared buffer insert full", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_full);
241         mono_counters_register ("Shared buffer insert iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_iterations);
242         mono_counters_register ("Shared buffer insert failures", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_failures);
243         mono_counters_register ("Shared buffer remove tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_tries);
244         mono_counters_register ("Shared buffer remove iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_iterations);
245         mono_counters_register ("Shared buffer remove empty", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_empty);
246 }
247
248 /* only the GC thread is allowed to start and join workers */
249
250 static void
251 workers_start_worker (int index)
252 {
253         g_assert (index >= 0 && index < workers_num);
254
255         if (workers_data [index].is_working)
256                 return;
257
258         if (!workers_data [index].thread) {
259                 //g_print ("initing thread %d\n", index);
260                 MONO_SEM_INIT (&workers_data [index].start_worker_sem, 0);
261                 pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]);
262         }
263
264         workers_data [index].is_working = TRUE;
265         MONO_SEM_POST (&workers_data [index].start_worker_sem);
266         //g_print ("posted thread start %d %d\n", index, workers_data [index].start_worker_sem);
267 }
268
269 static void
270 workers_start_all_workers (int num_additional_workers)
271 {
272         int i;
273
274         if (!major.is_parallel)
275                 return;
276
277         g_assert (workers_num_working == 0);
278         workers_num_working = workers_num + num_additional_workers;
279
280         for (i = 0; i < workers_num; ++i)
281                 workers_start_worker (i);
282 }
283
284 static void
285 workers_join (void)
286 {
287         int i;
288
289         if (!major.is_parallel)
290                 return;
291
292         //g_print ("joining\n");
293         for (i = 0; i < workers_num; ++i) {
294                 if (workers_data [i].is_working)
295                         MONO_SEM_WAIT (&workers_done_sem);
296         }
297         for (i = 0; i < workers_num; ++i)
298                 workers_data [i].is_working = FALSE;
299         //g_print ("joined\n");
300
301         g_assert (workers_num_working == 0);
302         g_assert (workers_shared_buffer_used == 0);
303
304         for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i)
305                 g_assert (!workers_shared_buffer [i]);
306 }
307
308 gboolean
309 mono_sgen_is_worker_thread (pthread_t thread)
310 {
311         int i;
312
313         if (!major.is_parallel)
314                 return FALSE;
315
316         for (i = 0; i < workers_num; ++i) {
317                 if (workers_data [i].thread == thread)
318                         return TRUE;
319         }
320         return FALSE;
321 }