2 * sgen-workers.c: Worker threads for parallel and concurrent GC.
4 * Copyright 2001-2003 Ximian, Inc
5 * Copyright 2003-2010 Novell, Inc.
6 * Copyright (C) 2012 Xamarin Inc
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License 2.0 as published by the Free Software Foundation;
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License 2.0 along with this library; if not, write to the Free
19 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
29 static int workers_num;
30 static WorkerData *workers_data;
31 static void *workers_gc_thread_major_collector_data = NULL;
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
36 static volatile gboolean workers_marking = FALSE;
37 static gboolean workers_started = FALSE;
43 * Decremented by the main thread and incremented by
46 guint32 num_waiting : 8;
47 /* Set by worker threads and reset by the main thread. */
48 guint32 done_posted : 1;
49 /* Set by the main thread. */
50 guint32 gc_in_progress : 1;
54 static volatile State workers_state;
56 static MonoSemType workers_waiting_sem;
57 static MonoSemType workers_done_sem;
59 static volatile int workers_job_queue_num_entries = 0;
60 static volatile JobQueueEntry *workers_job_queue = NULL;
61 static LOCK_DECLARE (workers_job_queue_mutex);
62 static int workers_num_jobs_enqueued = 0;
63 static volatile int workers_num_jobs_finished = 0;
65 static long long stat_workers_stolen_from_self_lock;
66 static long long stat_workers_stolen_from_self_no_lock;
67 static long long stat_workers_stolen_from_others;
68 static long long stat_workers_num_waited;
71 set_state (State old_state, State new_state)
73 return InterlockedCompareExchange (&workers_state.value,
74 new_state.value, old_state.value) == old_state.value;
78 workers_wake_up (int max)
82 for (i = 0; i < max; ++i) {
83 State old_state, new_state;
85 old_state = new_state = workers_state;
87 * We must not wake workers up once done has
90 if (old_state.data.done_posted)
92 if (old_state.data.num_waiting == 0)
94 --new_state.data.num_waiting;
95 } while (!set_state (old_state, new_state));
96 MONO_SEM_POST (&workers_waiting_sem);
101 workers_wake_up_all (void)
103 workers_wake_up (workers_num);
107 sgen_workers_wake_up_all (void)
109 g_assert (workers_state.data.gc_in_progress);
110 workers_wake_up_all ();
116 State old_state, new_state;
117 ++stat_workers_num_waited;
119 old_state = new_state = workers_state;
121 * Only the last worker thread awake can set the done
122 * posted flag, and since we're awake and haven't set
123 * it yet, it cannot be set.
125 g_assert (!old_state.data.done_posted);
126 ++new_state.data.num_waiting;
128 * This is the only place where we use
129 * workers_gc_in_progress in the worker threads.
131 if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
132 new_state.data.done_posted = 1;
133 } while (!set_state (old_state, new_state));
134 mono_memory_barrier ();
135 if (new_state.data.done_posted)
136 MONO_SEM_POST (&workers_done_sem);
137 MONO_SEM_WAIT (&workers_waiting_sem);
141 collection_needs_workers (void)
143 return sgen_collection_is_concurrent ();
147 sgen_workers_enqueue_job (JobFunc func, void *data)
150 JobQueueEntry *entry;
152 if (!collection_needs_workers ()) {
157 g_assert (workers_state.data.gc_in_progress);
159 entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
163 mono_mutex_lock (&workers_job_queue_mutex);
164 entry->next = workers_job_queue;
165 workers_job_queue = entry;
166 num_entries = ++workers_job_queue_num_entries;
167 ++workers_num_jobs_enqueued;
168 mono_mutex_unlock (&workers_job_queue_mutex);
170 workers_wake_up (num_entries);
174 sgen_workers_wait_for_jobs (void)
176 // FIXME: implement this properly
177 while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
178 State state = workers_state;
179 g_assert (state.data.gc_in_progress);
180 g_assert (!state.data.done_posted);
181 if (state.data.num_waiting == workers_num)
182 workers_wake_up_all ();
188 workers_dequeue_and_do_job (WorkerData *data)
190 JobQueueEntry *entry;
193 * At this point the GC might not be running anymore. We
194 * could have been woken up by a job that was then taken by
195 * another thread, after which the collection finished, so we
196 * first have to successfully dequeue a job before doing
197 * anything assuming that the collection is still ongoing.
200 if (!workers_job_queue_num_entries)
203 mono_mutex_lock (&workers_job_queue_mutex);
204 entry = (JobQueueEntry*)workers_job_queue;
206 workers_job_queue = entry->next;
207 --workers_job_queue_num_entries;
209 mono_mutex_unlock (&workers_job_queue_mutex);
214 g_assert (collection_needs_workers ());
216 entry->func (data, entry->data);
217 sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
219 SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
225 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
227 SgenGrayQueue *queue = &data->private_gray_queue;
230 g_assert (!queue->first);
232 if (!victim_data->stealable_stack_fill)
235 if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
238 n = num = (victim_data->stealable_stack_fill + 1) / 2;
239 /* We're stealing num entries. */
242 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
245 sgen_gray_object_alloc_queue_section (queue);
246 memcpy (queue->first->entries,
247 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
248 sizeof (GrayQueueEntry) * m);
249 queue->first->size = m;
252 * DO NOT move outside this loop
253 * Doing so trigger "assert not reached" in sgen-scan-object.h : we use the queue->cursor
254 * to compute the size of the first section during section allocation (via alloc_prepare_func
255 * -> workers_gray_queue_share_redirect -> sgen_gray_object_dequeue_section) which will be then
256 * set to 0, because queue->cursor is still pointing to queue->first->entries [-1], thus
257 * losing objects in the gray queue.
259 queue->cursor = queue->first->entries + queue->first->size - 1;
262 victim_data->stealable_stack_fill -= num;
265 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
267 if (data == victim_data) {
269 stat_workers_stolen_from_self_lock += num;
271 stat_workers_stolen_from_self_no_lock += num;
273 stat_workers_stolen_from_others += num;
280 workers_get_work (WorkerData *data)
282 SgenMajorCollector *major;
285 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
287 /* Try to steal from our own stack. */
288 if (workers_steal (data, data, TRUE))
291 /* From another worker. */
292 for (i = data->index + 1; i < workers_num + data->index; ++i) {
293 WorkerData *victim_data = &workers_data [i % workers_num];
294 g_assert (data != victim_data);
295 if (workers_steal (data, victim_data, TRUE))
300 * If we're concurrent or parallel, from the workers
301 * distribute gray queue.
303 major = sgen_get_major_collector ();
304 if (major->is_concurrent) {
305 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
307 sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
312 /* Nobody to steal from */
313 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
318 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
320 GrayQueueSection *section;
321 WorkerData *data = queue->alloc_prepare_data;
323 if (data->stealable_stack_fill) {
325 * There are still objects in the stealable stack, so
326 * wake up any workers that might be sleeping
328 if (workers_state.data.gc_in_progress)
329 workers_wake_up_all ();
333 /* The stealable stack is empty, so fill it. */
334 mono_mutex_lock (&data->stealable_stack_mutex);
336 while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
337 (section = sgen_gray_object_dequeue_section (queue))) {
338 int num = MIN (section->size, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
340 memcpy (data->stealable_stack + data->stealable_stack_fill,
341 section->entries + section->size - num,
342 sizeof (GrayQueueEntry) * num);
344 section->size -= num;
345 data->stealable_stack_fill += num;
348 sgen_gray_object_enqueue_section (queue, section);
350 sgen_gray_object_free_queue_section (section);
353 if (sgen_gray_object_queue_is_empty (queue))
354 workers_steal (data, data, FALSE);
356 mono_mutex_unlock (&data->stealable_stack_mutex);
358 if (workers_state.data.gc_in_progress)
359 workers_wake_up_all ();
363 concurrent_enqueue_check (char *obj)
365 g_assert (sgen_concurrent_collection_in_progress ());
366 g_assert (!sgen_ptr_in_nursery (obj));
367 g_assert (SGEN_LOAD_VTABLE (obj));
371 init_private_gray_queue (WorkerData *data)
373 sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
374 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
375 workers_gray_queue_share_redirect, data);
378 static mono_native_thread_return_t
379 workers_thread_func (void *data_untyped)
381 WorkerData *data = data_untyped;
382 SgenMajorCollector *major = sgen_get_major_collector ();
384 mono_thread_info_register_small_id ();
386 if (major->init_worker_thread)
387 major->init_worker_thread (data->major_collector_data);
389 init_private_gray_queue (data);
392 gboolean did_work = FALSE;
394 while (workers_dequeue_and_do_job (data)) {
396 /* FIXME: maybe distribute the gray queue here? */
399 if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
400 SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
401 ? &major->major_concurrent_ops
403 ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
405 g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
407 while (!sgen_drain_gray_stack (32, ctx))
408 workers_gray_queue_share_redirect (&data->private_gray_queue);
409 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
411 init_private_gray_queue (data);
420 /* dummy return to make compilers happy */
425 init_distribute_gray_queue (gboolean locked)
427 if (workers_distribute_gray_queue_inited) {
428 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
429 g_assert (!workers_distribute_gray_queue.locked == !locked);
433 sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
434 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
435 workers_distribute_gray_queue_inited = TRUE;
439 sgen_workers_init_distribute_gray_queue (void)
441 if (!collection_needs_workers ())
444 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
448 sgen_workers_init (int num_workers)
452 if (!sgen_get_major_collector ()->is_concurrent)
455 //g_print ("initing %d workers\n", num_workers);
457 workers_num = num_workers;
459 workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
460 memset (workers_data, 0, sizeof (WorkerData) * num_workers);
462 MONO_SEM_INIT (&workers_waiting_sem, 0);
463 MONO_SEM_INIT (&workers_done_sem, 0);
465 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
467 if (sgen_get_major_collector ()->alloc_worker_data)
468 workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
470 for (i = 0; i < workers_num; ++i) {
471 workers_data [i].index = i;
473 /* private gray queue is inited by the thread itself */
474 mono_mutex_init (&workers_data [i].stealable_stack_mutex);
475 workers_data [i].stealable_stack_fill = 0;
477 if (sgen_get_major_collector ()->alloc_worker_data)
478 workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
481 LOCK_INIT (workers_job_queue_mutex);
483 sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
485 mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
486 mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
487 mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
488 mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
491 /* only the GC thread is allowed to start and join workers */
494 workers_start_worker (int index)
496 g_assert (index >= 0 && index < workers_num);
498 g_assert (!workers_data [index].thread);
499 mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
503 sgen_workers_start_all_workers (void)
505 State old_state, new_state;
508 if (!collection_needs_workers ())
511 if (sgen_get_major_collector ()->init_worker_thread)
512 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
514 old_state = new_state = workers_state;
515 g_assert (!old_state.data.gc_in_progress);
516 new_state.data.gc_in_progress = TRUE;
518 workers_marking = FALSE;
520 g_assert (workers_job_queue_num_entries == 0);
521 workers_num_jobs_enqueued = 0;
522 workers_num_jobs_finished = 0;
524 if (workers_started) {
525 g_assert (old_state.data.done_posted);
526 if (old_state.data.num_waiting != workers_num) {
527 g_error ("Expecting all %d sgen workers to be parked, but only %d are",
528 workers_num, old_state.data.num_waiting);
531 /* Clear the done posted flag */
532 new_state.data.done_posted = 0;
533 if (!set_state (old_state, new_state))
534 g_assert_not_reached ();
536 workers_wake_up_all ();
540 g_assert (!old_state.data.done_posted);
542 if (!set_state (old_state, new_state))
543 g_assert_not_reached ();
545 for (i = 0; i < workers_num; ++i)
546 workers_start_worker (i);
548 workers_started = TRUE;
552 sgen_workers_have_started (void)
554 return workers_state.data.gc_in_progress;
558 sgen_workers_start_marking (void)
560 if (!collection_needs_workers ())
563 g_assert (workers_started && workers_state.data.gc_in_progress);
564 g_assert (!workers_marking);
566 workers_marking = TRUE;
568 workers_wake_up_all ();
572 sgen_workers_join (void)
574 State old_state, new_state;
577 if (!collection_needs_workers ())
581 old_state = new_state = workers_state;
582 g_assert (old_state.data.gc_in_progress);
583 g_assert (!old_state.data.done_posted);
585 new_state.data.gc_in_progress = 0;
586 } while (!set_state (old_state, new_state));
588 if (new_state.data.num_waiting == workers_num) {
590 * All the workers have shut down but haven't posted
591 * the done semaphore yet, or, if we come from below,
592 * haven't done all their work yet.
594 * It's not a big deal to wake them up again - they'll
595 * just do one iteration of their loop trying to find
596 * something to do and then go back to waiting again.
599 workers_wake_up_all ();
601 MONO_SEM_WAIT (&workers_done_sem);
603 old_state = new_state = workers_state;
604 g_assert (old_state.data.num_waiting == workers_num);
605 g_assert (old_state.data.done_posted);
607 if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
609 * There's a small race condition that we avoid here.
610 * It's possible that a worker thread runs out of
611 * things to do, so it goes to sleep. Right at that
612 * moment a new job is enqueued, but the thread is
613 * still registered as running. Now the threads are
614 * joined, and we wait for the semaphore. Only at
615 * this point does the worker go to sleep, and posts
616 * the semaphore, because workers_gc_in_progress is
617 * already FALSE. The job is still in the queue,
620 * Clear the done posted flag.
622 new_state.data.done_posted = 0;
623 if (!set_state (old_state, new_state))
624 g_assert_not_reached ();
628 /* At this point all the workers have stopped. */
630 workers_marking = FALSE;
632 if (sgen_get_major_collector ()->reset_worker_data) {
633 for (i = 0; i < workers_num; ++i)
634 sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
637 g_assert (workers_job_queue_num_entries == 0);
638 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
639 for (i = 0; i < workers_num; ++i) {
640 g_assert (!workers_data [i].stealable_stack_fill);
641 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
646 sgen_workers_all_done (void)
648 State state = workers_state;
650 * Can only be called while the collection is still in
651 * progress, i.e., before done has been posted.
653 g_assert (state.data.gc_in_progress);
654 g_assert (!state.data.done_posted);
655 return state.data.num_waiting == workers_num;
659 sgen_is_worker_thread (MonoNativeThreadId thread)
663 if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
666 for (i = 0; i < workers_num; ++i) {
667 if (workers_data [i].thread == thread)
673 SgenSectionGrayQueue*
674 sgen_workers_get_distribute_section_gray_queue (void)
676 return &workers_distribute_gray_queue;
680 sgen_workers_reset_data (void)
682 if (sgen_get_major_collector ()->reset_worker_data)
683 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);