2 * sgen-workers.c: Worker threads for parallel and concurrent GC.
4 * Copyright 2001-2003 Ximian, Inc
5 * Copyright 2003-2010 Novell, Inc.
6 * Copyright (C) 2012 Xamarin Inc
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License 2.0 as published by the Free Software Foundation;
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License 2.0 along with this library; if not, write to the Free
19 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
29 static int workers_num;
30 static WorkerData *workers_data;
31 static void *workers_gc_thread_major_collector_data = NULL;
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
36 static volatile gboolean workers_marking = FALSE;
37 static gboolean workers_started = FALSE;
43 * Decremented by the main thread and incremented by
46 guint32 num_waiting : 8;
47 /* Set by worker threads and reset by the main thread. */
48 guint32 done_posted : 1;
49 /* Set by the main thread. */
50 guint32 gc_in_progress : 1;
54 static volatile State workers_state;
56 static MonoSemType workers_waiting_sem;
57 static MonoSemType workers_done_sem;
59 static volatile int workers_job_queue_num_entries = 0;
60 static volatile JobQueueEntry *workers_job_queue = NULL;
61 static LOCK_DECLARE (workers_job_queue_mutex);
62 static int workers_num_jobs_enqueued = 0;
63 static volatile int workers_num_jobs_finished = 0;
65 static long long stat_workers_stolen_from_self_lock;
66 static long long stat_workers_stolen_from_self_no_lock;
67 static long long stat_workers_stolen_from_others;
68 static long long stat_workers_num_waited;
71 set_state (State old_state, State new_state)
73 return InterlockedCompareExchange (&workers_state.value,
74 new_state.value, old_state.value) == old_state.value;
78 workers_wake_up (int max)
82 for (i = 0; i < max; ++i) {
83 State old_state, new_state;
85 old_state = new_state = workers_state;
87 * We must not wake workers up once done has
90 if (old_state.data.done_posted)
92 if (old_state.data.num_waiting == 0)
94 --new_state.data.num_waiting;
95 } while (!set_state (old_state, new_state));
96 MONO_SEM_POST (&workers_waiting_sem);
101 workers_wake_up_all (void)
103 workers_wake_up (workers_num);
107 sgen_workers_wake_up_all (void)
109 g_assert (workers_state.data.gc_in_progress);
110 workers_wake_up_all ();
116 State old_state, new_state;
117 ++stat_workers_num_waited;
119 old_state = new_state = workers_state;
121 * Only the last worker thread awake can set the done
122 * posted flag, and since we're awake and haven't set
123 * it yet, it cannot be set.
125 g_assert (!old_state.data.done_posted);
126 ++new_state.data.num_waiting;
128 * This is the only place where we use
129 * workers_gc_in_progress in the worker threads.
131 if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
132 new_state.data.done_posted = 1;
133 } while (!set_state (old_state, new_state));
134 mono_memory_barrier ();
135 if (new_state.data.done_posted)
136 MONO_SEM_POST (&workers_done_sem);
137 MONO_SEM_WAIT (&workers_waiting_sem);
141 collection_needs_workers (void)
143 return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
147 sgen_workers_enqueue_job (JobFunc func, void *data)
150 JobQueueEntry *entry;
152 if (!collection_needs_workers ()) {
157 g_assert (workers_state.data.gc_in_progress);
159 entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
163 mono_mutex_lock (&workers_job_queue_mutex);
164 entry->next = workers_job_queue;
165 workers_job_queue = entry;
166 num_entries = ++workers_job_queue_num_entries;
167 ++workers_num_jobs_enqueued;
168 mono_mutex_unlock (&workers_job_queue_mutex);
170 workers_wake_up (num_entries);
174 sgen_workers_wait_for_jobs (void)
176 // FIXME: implement this properly
177 while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
178 State state = workers_state;
179 g_assert (state.data.gc_in_progress);
180 g_assert (!state.data.done_posted);
181 if (state.data.num_waiting == workers_num)
182 workers_wake_up_all ();
188 workers_dequeue_and_do_job (WorkerData *data)
190 JobQueueEntry *entry;
193 * At this point the GC might not be running anymore. We
194 * could have been woken up by a job that was then taken by
195 * another thread, after which the collection finished, so we
196 * first have to successfully dequeue a job before doing
197 * anything assuming that the collection is still ongoing.
200 if (!workers_job_queue_num_entries)
203 mono_mutex_lock (&workers_job_queue_mutex);
204 entry = (JobQueueEntry*)workers_job_queue;
206 workers_job_queue = entry->next;
207 --workers_job_queue_num_entries;
209 mono_mutex_unlock (&workers_job_queue_mutex);
214 g_assert (collection_needs_workers ());
216 entry->func (data, entry->data);
217 sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
219 SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
225 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
227 SgenGrayQueue *queue = &data->private_gray_queue;
230 g_assert (!queue->first);
232 if (!victim_data->stealable_stack_fill)
235 if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
238 n = num = (victim_data->stealable_stack_fill + 1) / 2;
239 /* We're stealing num entries. */
242 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
245 sgen_gray_object_alloc_queue_section (queue);
246 memcpy (queue->first->objects,
247 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
249 queue->first->end = m;
252 victim_data->stealable_stack_fill -= num;
255 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
257 if (data == victim_data) {
259 stat_workers_stolen_from_self_lock += num;
261 stat_workers_stolen_from_self_no_lock += num;
263 stat_workers_stolen_from_others += num;
270 workers_get_work (WorkerData *data)
272 SgenMajorCollector *major;
275 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
277 /* Try to steal from our own stack. */
278 if (workers_steal (data, data, TRUE))
281 /* From another worker. */
282 for (i = 0; i < workers_num; ++i) {
283 WorkerData *victim_data = &workers_data [i];
284 if (data == victim_data)
286 if (workers_steal (data, victim_data, TRUE))
291 * If we're concurrent or parallel, from the workers
292 * distribute gray queue.
294 major = sgen_get_major_collector ();
295 if (major->is_concurrent || major->is_parallel) {
296 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
298 sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
303 /* Nobody to steal from */
304 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
309 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
311 GrayQueueSection *section;
312 WorkerData *data = queue->alloc_prepare_data;
314 if (data->stealable_stack_fill) {
316 * There are still objects in the stealable stack, so
317 * wake up any workers that might be sleeping
319 if (workers_state.data.gc_in_progress)
320 workers_wake_up_all ();
324 /* The stealable stack is empty, so fill it. */
325 mono_mutex_lock (&data->stealable_stack_mutex);
327 while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
328 (section = sgen_gray_object_dequeue_section (queue))) {
329 int num = MIN (section->end, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
331 memcpy (data->stealable_stack + data->stealable_stack_fill,
332 section->objects + section->end - num,
333 sizeof (char*) * num);
336 data->stealable_stack_fill += num;
339 sgen_gray_object_enqueue_section (queue, section);
341 sgen_gray_object_free_queue_section (section);
344 if (sgen_gray_object_queue_is_empty (queue))
345 workers_steal (data, data, FALSE);
347 mono_mutex_unlock (&data->stealable_stack_mutex);
349 if (workers_state.data.gc_in_progress)
350 workers_wake_up_all ();
354 concurrent_enqueue_check (char *obj)
356 g_assert (sgen_concurrent_collection_in_progress ());
357 g_assert (!sgen_ptr_in_nursery (obj));
358 g_assert (SGEN_LOAD_VTABLE (obj));
362 init_private_gray_queue (WorkerData *data)
364 sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
365 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
366 workers_gray_queue_share_redirect, data);
369 static mono_native_thread_return_t
370 workers_thread_func (void *data_untyped)
372 WorkerData *data = data_untyped;
373 SgenMajorCollector *major = sgen_get_major_collector ();
375 mono_thread_info_register_small_id ();
377 if (major->init_worker_thread)
378 major->init_worker_thread (data->major_collector_data);
380 init_private_gray_queue (data);
383 gboolean did_work = FALSE;
385 while (workers_dequeue_and_do_job (data)) {
387 /* FIXME: maybe distribute the gray queue here? */
390 if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
391 SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
392 ? &major->major_concurrent_ops
394 ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
396 g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
398 while (!sgen_drain_gray_stack (32, ctx))
399 workers_gray_queue_share_redirect (&data->private_gray_queue);
400 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
402 init_private_gray_queue (data);
411 /* dummy return to make compilers happy */
416 init_distribute_gray_queue (gboolean locked)
418 if (workers_distribute_gray_queue_inited) {
419 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
420 g_assert (!workers_distribute_gray_queue.locked == !locked);
424 sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
425 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
426 workers_distribute_gray_queue_inited = TRUE;
430 sgen_workers_init_distribute_gray_queue (void)
432 if (!collection_needs_workers ())
435 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
439 sgen_workers_init (int num_workers)
443 if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
446 //g_print ("initing %d workers\n", num_workers);
448 workers_num = num_workers;
450 workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
451 memset (workers_data, 0, sizeof (WorkerData) * num_workers);
453 MONO_SEM_INIT (&workers_waiting_sem, 0);
454 MONO_SEM_INIT (&workers_done_sem, 0);
456 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
458 if (sgen_get_major_collector ()->alloc_worker_data)
459 workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
461 for (i = 0; i < workers_num; ++i) {
462 /* private gray queue is inited by the thread itself */
463 mono_mutex_init (&workers_data [i].stealable_stack_mutex);
464 workers_data [i].stealable_stack_fill = 0;
466 if (sgen_get_major_collector ()->alloc_worker_data)
467 workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
470 LOCK_INIT (workers_job_queue_mutex);
472 sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
474 mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
475 mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
476 mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
477 mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
480 /* only the GC thread is allowed to start and join workers */
483 workers_start_worker (int index)
485 g_assert (index >= 0 && index < workers_num);
487 g_assert (!workers_data [index].thread);
488 mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
492 sgen_workers_start_all_workers (void)
494 State old_state, new_state;
497 if (!collection_needs_workers ())
500 if (sgen_get_major_collector ()->init_worker_thread)
501 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
503 old_state = new_state = workers_state;
504 g_assert (!old_state.data.gc_in_progress);
505 new_state.data.gc_in_progress = TRUE;
507 workers_marking = FALSE;
509 g_assert (workers_job_queue_num_entries == 0);
510 workers_num_jobs_enqueued = 0;
511 workers_num_jobs_finished = 0;
513 if (workers_started) {
514 g_assert (old_state.data.done_posted);
515 if (old_state.data.num_waiting != workers_num) {
516 g_error ("Expecting all %d sgen workers to be parked, but only %d are",
517 workers_num, old_state.data.num_waiting);
520 /* Clear the done posted flag */
521 new_state.data.done_posted = 0;
522 if (!set_state (old_state, new_state))
523 g_assert_not_reached ();
525 workers_wake_up_all ();
529 g_assert (!old_state.data.done_posted);
531 if (!set_state (old_state, new_state))
532 g_assert_not_reached ();
534 for (i = 0; i < workers_num; ++i)
535 workers_start_worker (i);
537 workers_started = TRUE;
541 sgen_workers_have_started (void)
543 return workers_state.data.gc_in_progress;
547 sgen_workers_start_marking (void)
549 if (!collection_needs_workers ())
552 g_assert (workers_started && workers_state.data.gc_in_progress);
553 g_assert (!workers_marking);
555 workers_marking = TRUE;
557 workers_wake_up_all ();
561 sgen_workers_join (void)
563 State old_state, new_state;
566 if (!collection_needs_workers ())
570 old_state = new_state = workers_state;
571 g_assert (old_state.data.gc_in_progress);
572 g_assert (!old_state.data.done_posted);
574 new_state.data.gc_in_progress = 0;
575 } while (!set_state (old_state, new_state));
577 if (new_state.data.num_waiting == workers_num) {
579 * All the workers have shut down but haven't posted
580 * the done semaphore yet, or, if we come from below,
581 * haven't done all their work yet.
583 * It's not a big deal to wake them up again - they'll
584 * just do one iteration of their loop trying to find
585 * something to do and then go back to waiting again.
588 workers_wake_up_all ();
590 MONO_SEM_WAIT (&workers_done_sem);
592 old_state = new_state = workers_state;
593 g_assert (old_state.data.num_waiting == workers_num);
594 g_assert (old_state.data.done_posted);
596 if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
598 * There's a small race condition that we avoid here.
599 * It's possible that a worker thread runs out of
600 * things to do, so it goes to sleep. Right at that
601 * moment a new job is enqueued, but the thread is
602 * still registered as running. Now the threads are
603 * joined, and we wait for the semaphore. Only at
604 * this point does the worker go to sleep, and posts
605 * the semaphore, because workers_gc_in_progress is
606 * already FALSE. The job is still in the queue,
609 * Clear the done posted flag.
611 new_state.data.done_posted = 0;
612 if (!set_state (old_state, new_state))
613 g_assert_not_reached ();
617 /* At this point all the workers have stopped. */
619 workers_marking = FALSE;
621 if (sgen_get_major_collector ()->reset_worker_data) {
622 for (i = 0; i < workers_num; ++i)
623 sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
626 g_assert (workers_job_queue_num_entries == 0);
627 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
628 for (i = 0; i < workers_num; ++i) {
629 g_assert (!workers_data [i].stealable_stack_fill);
630 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
635 sgen_workers_all_done (void)
637 State state = workers_state;
639 * Can only be called while the collection is still in
640 * progress, i.e., before done has been posted.
642 g_assert (state.data.gc_in_progress);
643 g_assert (!state.data.done_posted);
644 return state.data.num_waiting == workers_num;
648 sgen_is_worker_thread (MonoNativeThreadId thread)
652 if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
655 for (i = 0; i < workers_num; ++i) {
656 if (workers_data [i].thread == thread)
662 SgenSectionGrayQueue*
663 sgen_workers_get_distribute_section_gray_queue (void)
665 return &workers_distribute_gray_queue;
669 sgen_workers_reset_data (void)
671 if (sgen_get_major_collector ()->reset_worker_data)
672 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);