[sgen] Fix locking of the worker distribute gray queue.
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * sgen-workers.c: Worker threads for parallel and concurrent GC.
3  *
4  * Copyright 2001-2003 Ximian, Inc
5  * Copyright 2003-2010 Novell, Inc.
6  * Copyright (C) 2012 Xamarin Inc
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License 2.0 as published by the Free Software Foundation;
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License 2.0 along with this library; if not, write to the Free
19  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "config.h"
23 #ifdef HAVE_SGEN_GC
24
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
28
29 static int workers_num;
30 static WorkerData *workers_data;
31 static WorkerData workers_gc_thread_data;
32
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
35
36 static volatile gboolean workers_gc_in_progress = FALSE;
37 static volatile gboolean workers_marking = FALSE;
38 static gboolean workers_started = FALSE;
39 static volatile int workers_num_waiting = 0;
40 static MonoSemType workers_waiting_sem;
41 static MonoSemType workers_done_sem;
42 static volatile int workers_done_posted = 0;
43
44 static volatile int workers_job_queue_num_entries = 0;
45 static volatile JobQueueEntry *workers_job_queue = NULL;
46 static LOCK_DECLARE (workers_job_queue_mutex);
47 static int workers_num_jobs_enqueued = 0;
48 static volatile int workers_num_jobs_finished = 0;
49
50 static long long stat_workers_stolen_from_self_lock;
51 static long long stat_workers_stolen_from_self_no_lock;
52 static long long stat_workers_stolen_from_others;
53 static long long stat_workers_num_waited;
54
55 static void
56 workers_wake_up (int max)
57 {
58         int i;
59
60         for (i = 0; i < max; ++i) {
61                 int num;
62                 do {
63                         num = workers_num_waiting;
64                         if (num == 0)
65                                 return;
66                 } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num);
67                 MONO_SEM_POST (&workers_waiting_sem);
68         }
69 }
70
71 static void
72 workers_wake_up_all (void)
73 {
74         workers_wake_up (workers_num);
75 }
76
77 void
78 sgen_workers_wake_up_all (void)
79 {
80         g_assert (workers_gc_in_progress);
81         workers_wake_up_all ();
82 }
83
84 static void
85 workers_wait (void)
86 {
87         int num;
88         ++stat_workers_num_waited;
89         do {
90                 num = workers_num_waiting;
91         } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num);
92         if (num + 1 == workers_num && !workers_gc_in_progress) {
93                 /* Make sure the done semaphore is only posted once. */
94                 int posted;
95                 do {
96                         posted = workers_done_posted;
97                         if (posted)
98                                 break;
99                 } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0);
100                 if (!posted)
101                         MONO_SEM_POST (&workers_done_sem);
102         }
103         MONO_SEM_WAIT (&workers_waiting_sem);
104 }
105
106 static gboolean
107 collection_needs_workers (void)
108 {
109         return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
110 }
111
112 void
113 sgen_workers_enqueue_job (JobFunc func, void *data)
114 {
115         int num_entries;
116         JobQueueEntry *entry;
117
118         if (!collection_needs_workers ()) {
119                 func (NULL, data);
120                 return;
121         }
122
123         entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
124         entry->func = func;
125         entry->data = data;
126
127         mono_mutex_lock (&workers_job_queue_mutex);
128         entry->next = workers_job_queue;
129         workers_job_queue = entry;
130         num_entries = ++workers_job_queue_num_entries;
131         ++workers_num_jobs_enqueued;
132         mono_mutex_unlock (&workers_job_queue_mutex);
133
134         workers_wake_up (num_entries);
135 }
136
137 void
138 sgen_workers_wait_for_jobs (void)
139 {
140         // FIXME: implement this properly
141         while (workers_num_jobs_finished < workers_num_jobs_enqueued)
142                 g_usleep (1000);
143 }
144
145 static gboolean
146 workers_dequeue_and_do_job (WorkerData *data)
147 {
148         JobQueueEntry *entry;
149         int num_finished;
150
151         /*
152          * At this point the GC might not be running anymore.  We
153          * could have been woken up by a job that was then taken by
154          * another thread, after which the collection finished, so we
155          * first have to successfully dequeue a job before doing
156          * anything assuming that the collection is still ongoing.
157          */
158
159         if (!workers_job_queue_num_entries)
160                 return FALSE;
161
162         mono_mutex_lock (&workers_job_queue_mutex);
163         entry = (JobQueueEntry*)workers_job_queue;
164         if (entry) {
165                 workers_job_queue = entry->next;
166                 --workers_job_queue_num_entries;
167         }
168         mono_mutex_unlock (&workers_job_queue_mutex);
169
170         if (!entry)
171                 return FALSE;
172
173         g_assert (collection_needs_workers ());
174
175         entry->func (data, entry->data);
176         sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
177
178         do {
179                 num_finished = workers_num_jobs_finished;
180         } while (InterlockedCompareExchange (&workers_num_jobs_finished, num_finished + 1, num_finished) != num_finished);
181
182         return TRUE;
183 }
184
185 static gboolean
186 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
187 {
188         SgenGrayQueue *queue = &data->private_gray_queue;
189         int num, n;
190
191         g_assert (!queue->first);
192
193         if (!victim_data->stealable_stack_fill)
194                 return FALSE;
195
196         if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
197                 return FALSE;
198
199         n = num = (victim_data->stealable_stack_fill + 1) / 2;
200         /* We're stealing num entries. */
201
202         while (n > 0) {
203                 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
204                 n -= m;
205
206                 sgen_gray_object_alloc_queue_section (queue);
207                 memcpy (queue->first->objects,
208                                 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
209                                 sizeof (char*) * m);
210                 queue->first->end = m;
211         }
212
213         victim_data->stealable_stack_fill -= num;
214
215         if (lock)
216                 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
217
218         if (data == victim_data) {
219                 if (lock)
220                         stat_workers_stolen_from_self_lock += num;
221                 else
222                         stat_workers_stolen_from_self_no_lock += num;
223         } else {
224                 stat_workers_stolen_from_others += num;
225         }
226
227         return num != 0;
228 }
229
230 static gboolean
231 workers_get_work (WorkerData *data)
232 {
233         SgenMajorCollector *major;
234         int i;
235
236         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
237
238         /* Try to steal from our own stack. */
239         if (workers_steal (data, data, TRUE))
240                 return TRUE;
241
242         /* Then from the GC thread's stack. */
243         if (workers_steal (data, &workers_gc_thread_data, TRUE))
244                 return TRUE;
245
246         /* From another worker. */
247         for (i = 0; i < workers_num; ++i) {
248                 WorkerData *victim_data = &workers_data [i];
249                 if (data == victim_data)
250                         continue;
251                 if (workers_steal (data, victim_data, TRUE))
252                         return TRUE;
253         }
254
255         /*
256          * If we're concurrent or parallel, from the workers
257          * distribute gray queue.
258          */
259         major = sgen_get_major_collector ();
260         if (major->is_concurrent || major->is_parallel) {
261                 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
262                 if (section) {
263                         sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
264                         return TRUE;
265                 }
266         }
267
268         /* Nobody to steal from */
269         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
270         return FALSE;
271 }
272
273 static void
274 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
275 {
276         GrayQueueSection *section;
277         WorkerData *data = queue->alloc_prepare_data;
278
279         if (data->stealable_stack_fill) {
280                 /*
281                  * There are still objects in the stealable stack, so
282                  * wake up any workers that might be sleeping
283                  */
284                 if (workers_gc_in_progress)
285                         workers_wake_up_all ();
286                 return;
287         }
288
289         /* The stealable stack is empty, so fill it. */
290         mono_mutex_lock (&data->stealable_stack_mutex);
291
292         while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
293                         (section = sgen_gray_object_dequeue_section (queue))) {
294                 int num = MIN (section->end, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
295
296                 memcpy (data->stealable_stack + data->stealable_stack_fill,
297                                 section->objects + section->end - num,
298                                 sizeof (char*) * num);
299
300                 section->end -= num;
301                 data->stealable_stack_fill += num;
302
303                 if (section->end)
304                         sgen_gray_object_enqueue_section (queue, section);
305                 else
306                         sgen_gray_object_free_queue_section (section);
307         }
308
309         if (data != &workers_gc_thread_data && sgen_gray_object_queue_is_empty (queue))
310                 workers_steal (data, data, FALSE);
311
312         mono_mutex_unlock (&data->stealable_stack_mutex);
313
314         if (workers_gc_in_progress)
315                 workers_wake_up_all ();
316 }
317
318 static void
319 concurrent_enqueue_check (char *obj)
320 {
321         g_assert (!sgen_ptr_in_nursery (obj));
322         g_assert (SGEN_LOAD_VTABLE (obj));
323 }
324
325 static void
326 init_private_gray_queue (WorkerData *data)
327 {
328         sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
329                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
330                         workers_gray_queue_share_redirect, data);
331 }
332
333 static mono_native_thread_return_t
334 workers_thread_func (void *data_untyped)
335 {
336         WorkerData *data = data_untyped;
337
338         mono_thread_info_register_small_id ();
339
340         if (sgen_get_major_collector ()->init_worker_thread)
341                 sgen_get_major_collector ()->init_worker_thread (data->major_collector_data);
342
343         init_private_gray_queue (data);
344
345         for (;;) {
346                 gboolean did_work = FALSE;
347
348                 while (workers_dequeue_and_do_job (data)) {
349                         did_work = TRUE;
350                         /* FIXME: maybe distribute the gray queue here? */
351                 }
352
353                 if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
354                         g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
355
356                         while (!sgen_drain_gray_stack (&data->private_gray_queue, sgen_get_major_collector ()->major_ops.scan_object, 32))
357                                 workers_gray_queue_share_redirect (&data->private_gray_queue);
358                         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
359
360                         init_private_gray_queue (data);
361
362                         did_work = TRUE;
363                 }
364
365                 if (!did_work)
366                         workers_wait ();
367         }
368
369         /* dummy return to make compilers happy */
370         return NULL;
371 }
372
373 static void
374 init_distribute_gray_queue (gboolean locked)
375 {
376         if (workers_distribute_gray_queue_inited) {
377                 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
378                 g_assert (!workers_distribute_gray_queue.locked == !locked);
379                 return;
380         }
381
382         sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
383                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
384         workers_distribute_gray_queue_inited = TRUE;
385 }
386
387 void
388 sgen_workers_init_distribute_gray_queue (void)
389 {
390         if (!collection_needs_workers ())
391                 return;
392
393         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
394 }
395
396 void
397 sgen_workers_init (int num_workers)
398 {
399         int i;
400
401         if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
402                 return;
403
404         //g_print ("initing %d workers\n", num_workers);
405
406         workers_num = num_workers;
407
408         workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
409         memset (workers_data, 0, sizeof (WorkerData) * num_workers);
410
411         MONO_SEM_INIT (&workers_waiting_sem, 0);
412         MONO_SEM_INIT (&workers_done_sem, 0);
413
414         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
415         mono_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL);
416         workers_gc_thread_data.stealable_stack_fill = 0;
417
418         if (sgen_get_major_collector ()->alloc_worker_data)
419                 workers_gc_thread_data.major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
420
421         for (i = 0; i < workers_num; ++i) {
422                 /* private gray queue is inited by the thread itself */
423                 mono_mutex_init (&workers_data [i].stealable_stack_mutex, NULL);
424                 workers_data [i].stealable_stack_fill = 0;
425
426                 if (sgen_get_major_collector ()->alloc_worker_data)
427                         workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
428         }
429
430         LOCK_INIT (workers_job_queue_mutex);
431
432         sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
433
434         mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
435         mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
436         mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
437         mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
438 }
439
440 /* only the GC thread is allowed to start and join workers */
441
442 static void
443 workers_start_worker (int index)
444 {
445         g_assert (index >= 0 && index < workers_num);
446
447         g_assert (!workers_data [index].thread);
448         mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
449 }
450
451 void
452 sgen_workers_start_all_workers (void)
453 {
454         int i;
455
456         if (!collection_needs_workers ())
457                 return;
458
459         if (sgen_get_major_collector ()->init_worker_thread)
460                 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_data.major_collector_data);
461
462         g_assert (!workers_gc_in_progress);
463         workers_gc_in_progress = TRUE;
464         workers_marking = FALSE;
465         workers_done_posted = 0;
466
467         g_assert (workers_job_queue_num_entries == 0);
468         workers_num_jobs_enqueued = 0;
469         workers_num_jobs_finished = 0;
470
471         if (workers_started) {
472                 if (workers_num_waiting != workers_num)
473                         g_error ("Expecting all %d sgen workers to be parked, but only %d are", workers_num, workers_num_waiting);
474                 workers_wake_up_all ();
475                 return;
476         }
477
478         for (i = 0; i < workers_num; ++i)
479                 workers_start_worker (i);
480
481         workers_started = TRUE;
482 }
483
484 void
485 sgen_workers_start_marking (void)
486 {
487         if (!collection_needs_workers ())
488                 return;
489
490         g_assert (workers_started && workers_gc_in_progress);
491         g_assert (!workers_marking);
492
493         workers_marking = TRUE;
494
495         workers_wake_up_all ();
496 }
497
498 void
499 sgen_workers_join (void)
500 {
501         int i;
502
503         if (!collection_needs_workers ())
504                 return;
505
506         g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
507
508         g_assert (workers_gc_in_progress);
509         workers_gc_in_progress = FALSE;
510         if (workers_num_waiting == workers_num) {
511                 /*
512                  * All the workers might have shut down at this point
513                  * and posted the done semaphore but we don't know it
514                  * yet.  It's not a big deal to wake them up again -
515                  * they'll just do one iteration of their loop trying to
516                  * find something to do and then go back to waiting
517                  * again.
518                  */
519                 workers_wake_up_all ();
520         }
521         MONO_SEM_WAIT (&workers_done_sem);
522         workers_marking = FALSE;
523
524         if (sgen_get_major_collector ()->reset_worker_data) {
525                 for (i = 0; i < workers_num; ++i)
526                         sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
527         }
528
529         g_assert (workers_done_posted);
530
531         g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
532         g_assert (!workers_gc_thread_data.stealable_stack_fill);
533         g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
534         for (i = 0; i < workers_num; ++i) {
535                 g_assert (!workers_data [i].stealable_stack_fill);
536                 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
537         }
538 }
539
540 gboolean
541 sgen_workers_all_done (void)
542 {
543         return workers_num_waiting == workers_num;
544 }
545
546 gboolean
547 sgen_is_worker_thread (MonoNativeThreadId thread)
548 {
549         int i;
550
551         if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
552                 return TRUE;
553
554         for (i = 0; i < workers_num; ++i) {
555                 if (workers_data [i].thread == thread)
556                         return TRUE;
557         }
558         return FALSE;
559 }
560
561 SgenSectionGrayQueue*
562 sgen_workers_get_distribute_section_gray_queue (void)
563 {
564         return &workers_distribute_gray_queue;
565 }
566
567 void
568 sgen_workers_reset_data (void)
569 {
570         if (sgen_get_major_collector ()->reset_worker_data)
571                 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_data.major_collector_data);
572
573 }
574 #endif