2 * sgen-workers.c: Worker threads for parallel and concurrent GC.
4 * Copyright 2001-2003 Ximian, Inc
5 * Copyright 2003-2010 Novell, Inc.
6 * Copyright (C) 2012 Xamarin Inc
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License 2.0 as published by the Free Software Foundation;
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License 2.0 along with this library; if not, write to the Free
19 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
29 static int workers_num;
30 static WorkerData *workers_data;
31 static void *workers_gc_thread_major_collector_data = NULL;
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
36 static gboolean workers_started = FALSE;
41 STATE_NURSERY_COLLECTION
45 * | state | num_awake | num_posted | post_done |
46 * |--------------------------+-----------+----------------------------+-----------|
47 * | STATE_NOT_WORKING | 0 | * | 0 |
48 * | STATE_WORKING | > 0 | <= workers_num - num_awake | * |
49 * | STATE_NURSERY_COLLECTION | * | <= workers_num - num_awake | 1 |
50 * | STATE_NURSERY_COLLECTION | 0 | 0 | 0 |
55 guint state : 4; /* WorkersStateName */
56 /* Number of worker threads awake. */
58 /* The state of the waiting semaphore. */
60 /* Whether to post `workers_done_sem` */
65 static volatile State workers_state;
67 static MonoSemType workers_waiting_sem;
68 static MonoSemType workers_done_sem;
70 static volatile int workers_job_queue_num_entries = 0;
71 static volatile JobQueueEntry *workers_job_queue = NULL;
72 static LOCK_DECLARE (workers_job_queue_mutex);
73 static int workers_num_jobs_enqueued = 0;
74 static volatile int workers_num_jobs_finished = 0;
76 static guint64 stat_workers_stolen_from_self_lock;
77 static guint64 stat_workers_stolen_from_self_no_lock;
78 static guint64 stat_workers_stolen_from_others;
79 static guint64 stat_workers_num_waited;
82 set_state (State old_state, State new_state)
84 if (old_state.data.state == STATE_NURSERY_COLLECTION)
85 SGEN_ASSERT (0, new_state.data.state != STATE_NOT_WORKING, "Can't go from nursery collection to not working");
87 return InterlockedCompareExchange (&workers_state.value,
88 new_state.value, old_state.value) == old_state.value;
92 assert_not_working (State state)
94 SGEN_ASSERT (0, state.data.state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state");
95 SGEN_ASSERT (0, state.data.num_awake == 0, "No workers can be awake when not working");
96 SGEN_ASSERT (0, state.data.num_posted == 0, "Can't have posted already");
97 SGEN_ASSERT (0, !state.data.post_done, "post_done can only be set when working");
102 assert_working (State state, gboolean from_worker)
104 SGEN_ASSERT (0, state.data.state == STATE_WORKING, "A worker can't wait without being in working state");
106 SGEN_ASSERT (0, state.data.num_awake > 0, "How can we be awake, yet we are not counted?");
108 SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted > 0, "How can we be working, yet no worker threads are awake or to be awoken?");
109 SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted <= workers_num, "There are too many worker threads awake");
113 assert_nursery_collection (State state, gboolean from_worker)
115 SGEN_ASSERT (0, state.data.state == STATE_NURSERY_COLLECTION, "Must be in the nursery collection state");
117 SGEN_ASSERT (0, state.data.num_awake > 0, "We're awake, but num_awake is zero");
118 SGEN_ASSERT (0, state.data.post_done, "post_done must be set in the nursery collection state");
120 SGEN_ASSERT (0, state.data.num_awake <= workers_num, "There are too many worker threads awake");
121 if (!state.data.post_done) {
122 SGEN_ASSERT (0, state.data.num_awake == 0, "Once done has been posted no threads can be awake");
123 SGEN_ASSERT (0, state.data.num_posted == 0, "Once done has been posted no thread must be awoken");
128 assert_working_or_nursery_collection (State state)
130 if (state.data.state == STATE_WORKING)
131 assert_working (state, TRUE);
133 assert_nursery_collection (state, TRUE);
137 workers_signal_enqueue_work (int num_wake_up, gboolean from_nursery_collection)
139 State old_state = workers_state;
140 State new_state = old_state;
142 gboolean did_set_state;
144 SGEN_ASSERT (0, num_wake_up <= workers_num, "Cannot wake up more workers than are present");
146 if (from_nursery_collection)
147 assert_nursery_collection (old_state, FALSE);
149 assert_not_working (old_state);
151 new_state.data.state = STATE_WORKING;
152 new_state.data.num_posted = num_wake_up;
154 did_set_state = set_state (old_state, new_state);
155 SGEN_ASSERT (0, did_set_state, "Nobody else should be mutating the state");
157 for (i = 0; i < num_wake_up; ++i)
158 MONO_SEM_POST (&workers_waiting_sem);
162 workers_signal_enqueue_work_if_necessary (int num_wake_up)
164 if (workers_state.data.state == STATE_NOT_WORKING)
165 workers_signal_enqueue_work (num_wake_up, FALSE);
169 sgen_workers_ensure_awake (void)
171 SGEN_ASSERT (0, workers_state.data.state != STATE_NURSERY_COLLECTION, "Can't wake workers during nursery collection");
172 workers_signal_enqueue_work_if_necessary (workers_num);
178 State old_state, new_state;
181 ++stat_workers_num_waited;
184 new_state = old_state = workers_state;
186 assert_working_or_nursery_collection (old_state);
188 --new_state.data.num_awake;
190 if (!new_state.data.num_awake && !new_state.data.num_posted) {
191 /* We are the last thread to go to sleep. */
192 if (old_state.data.state == STATE_WORKING)
193 new_state.data.state = STATE_NOT_WORKING;
195 new_state.data.post_done = 0;
196 if (old_state.data.post_done)
199 } while (!set_state (old_state, new_state));
202 MONO_SEM_POST (&workers_done_sem);
204 MONO_SEM_WAIT (&workers_waiting_sem);
207 new_state = old_state = workers_state;
209 SGEN_ASSERT (0, old_state.data.num_posted > 0, "How can we be awake without the semaphore having been posted?");
210 SGEN_ASSERT (0, old_state.data.num_awake < workers_num, "There are too many worker threads awake");
212 --new_state.data.num_posted;
213 ++new_state.data.num_awake;
215 assert_working_or_nursery_collection (new_state);
216 } while (!set_state (old_state, new_state));
220 collection_needs_workers (void)
222 return sgen_collection_is_concurrent ();
226 sgen_workers_enqueue_job (JobFunc func, void *data)
229 JobQueueEntry *entry;
231 if (!collection_needs_workers ()) {
236 entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
240 mono_mutex_lock (&workers_job_queue_mutex);
241 entry->next = workers_job_queue;
242 workers_job_queue = entry;
243 num_entries = ++workers_job_queue_num_entries;
244 ++workers_num_jobs_enqueued;
245 mono_mutex_unlock (&workers_job_queue_mutex);
247 if (workers_state.data.state != STATE_NURSERY_COLLECTION)
248 workers_signal_enqueue_work_if_necessary (num_entries);
252 sgen_workers_wait_for_jobs_finished (void)
254 // FIXME: implement this properly
255 while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
256 workers_signal_enqueue_work_if_necessary (workers_num);
257 /* FIXME: sleep less? */
263 sgen_workers_signal_start_nursery_collection_and_wait (void)
265 State old_state, new_state;
268 new_state = old_state = workers_state;
270 new_state.data.state = STATE_NURSERY_COLLECTION;
272 if (old_state.data.state == STATE_NOT_WORKING) {
273 assert_not_working (old_state);
275 assert_working (old_state, FALSE);
276 SGEN_ASSERT (0, !old_state.data.post_done, "We are not waiting for the workers");
278 new_state.data.post_done = 1;
280 } while (!set_state (old_state, new_state));
282 if (new_state.data.post_done)
283 MONO_SEM_WAIT (&workers_done_sem);
285 old_state = workers_state;
286 assert_nursery_collection (old_state, FALSE);
287 SGEN_ASSERT (0, !old_state.data.post_done, "We got the semaphore, so it must have been posted");
291 sgen_workers_signal_finish_nursery_collection (void)
293 State old_state = workers_state;
295 assert_nursery_collection (old_state, FALSE);
296 SGEN_ASSERT (0, !old_state.data.post_done, "We are finishing the nursery collection, so we should have waited for the semaphore earlier");
298 workers_signal_enqueue_work (workers_num, TRUE);
302 workers_dequeue_and_do_job (WorkerData *data)
304 JobQueueEntry *entry;
307 * At this point the GC might not be running anymore. We
308 * could have been woken up by a job that was then taken by
309 * another thread, after which the collection finished, so we
310 * first have to successfully dequeue a job before doing
311 * anything assuming that the collection is still ongoing.
314 if (!workers_job_queue_num_entries)
317 mono_mutex_lock (&workers_job_queue_mutex);
318 entry = (JobQueueEntry*)workers_job_queue;
320 workers_job_queue = entry->next;
321 --workers_job_queue_num_entries;
323 mono_mutex_unlock (&workers_job_queue_mutex);
328 g_assert (collection_needs_workers ());
330 entry->func (data, entry->data);
331 sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
333 SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
339 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
341 SgenGrayQueue *queue = &data->private_gray_queue;
344 g_assert (!queue->first);
346 if (!victim_data->stealable_stack_fill)
349 if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
352 n = num = (victim_data->stealable_stack_fill + 1) / 2;
353 /* We're stealing num entries. */
356 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
359 sgen_gray_object_alloc_queue_section (queue);
360 memcpy (queue->first->entries,
361 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
362 sizeof (GrayQueueEntry) * m);
363 queue->first->size = m;
366 * DO NOT move outside this loop
367 * Doing so trigger "assert not reached" in sgen-scan-object.h : we use the queue->cursor
368 * to compute the size of the first section during section allocation (via alloc_prepare_func
369 * -> workers_gray_queue_share_redirect -> sgen_gray_object_dequeue_section) which will be then
370 * set to 0, because queue->cursor is still pointing to queue->first->entries [-1], thus
371 * losing objects in the gray queue.
373 queue->cursor = queue->first->entries + queue->first->size - 1;
376 victim_data->stealable_stack_fill -= num;
379 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
381 if (data == victim_data) {
383 stat_workers_stolen_from_self_lock += num;
385 stat_workers_stolen_from_self_no_lock += num;
387 stat_workers_stolen_from_others += num;
394 workers_get_work (WorkerData *data)
396 SgenMajorCollector *major;
399 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
401 /* Try to steal from our own stack. */
402 if (workers_steal (data, data, TRUE))
405 /* From another worker. */
406 for (i = data->index + 1; i < workers_num + data->index; ++i) {
407 WorkerData *victim_data = &workers_data [i % workers_num];
408 g_assert (data != victim_data);
409 if (workers_steal (data, victim_data, TRUE))
414 * If we're concurrent or parallel, from the workers
415 * distribute gray queue.
417 major = sgen_get_major_collector ();
418 if (major->is_concurrent) {
419 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
421 sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
426 /* Nobody to steal from */
427 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
432 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
434 GrayQueueSection *section;
435 WorkerData *data = queue->alloc_prepare_data;
437 if (data->stealable_stack_fill) {
439 * There are still objects in the stealable stack, so
440 * wake up any workers that might be sleeping
442 workers_signal_enqueue_work_if_necessary (workers_num);
446 /* The stealable stack is empty, so fill it. */
447 mono_mutex_lock (&data->stealable_stack_mutex);
449 while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
450 (section = sgen_gray_object_dequeue_section (queue))) {
451 int num = MIN (section->size, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
453 memcpy (data->stealable_stack + data->stealable_stack_fill,
454 section->entries + section->size - num,
455 sizeof (GrayQueueEntry) * num);
457 section->size -= num;
458 data->stealable_stack_fill += num;
461 sgen_gray_object_enqueue_section (queue, section);
463 sgen_gray_object_free_queue_section (section);
466 if (sgen_gray_object_queue_is_empty (queue))
467 workers_steal (data, data, FALSE);
469 mono_mutex_unlock (&data->stealable_stack_mutex);
471 workers_signal_enqueue_work_if_necessary (workers_num);
475 concurrent_enqueue_check (char *obj)
477 g_assert (sgen_concurrent_collection_in_progress ());
478 g_assert (!sgen_ptr_in_nursery (obj));
479 g_assert (SGEN_LOAD_VTABLE (obj));
483 init_private_gray_queue (WorkerData *data)
485 sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
486 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
487 workers_gray_queue_share_redirect, data);
490 static mono_native_thread_return_t
491 workers_thread_func (void *data_untyped)
493 WorkerData *data = data_untyped;
494 SgenMajorCollector *major = sgen_get_major_collector ();
496 mono_thread_info_register_small_id ();
498 if (major->init_worker_thread)
499 major->init_worker_thread (data->major_collector_data);
501 init_private_gray_queue (data);
504 gboolean did_work = FALSE;
506 SGEN_ASSERT (0, sgen_get_current_collection_generation () != GENERATION_NURSERY, "Why are we doing work while there's a nursery collection happening?");
508 while (workers_state.data.state == STATE_WORKING && workers_dequeue_and_do_job (data)) {
510 /* FIXME: maybe distribute the gray queue here? */
513 if (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data)) {
514 SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
515 ? &major->major_concurrent_ops
517 ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
519 g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
521 while (!sgen_drain_gray_stack (32, ctx)) {
522 if (workers_state.data.state == STATE_NURSERY_COLLECTION)
525 workers_gray_queue_share_redirect (&data->private_gray_queue);
527 g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
529 init_private_gray_queue (data);
538 /* dummy return to make compilers happy */
543 init_distribute_gray_queue (gboolean locked)
545 if (workers_distribute_gray_queue_inited) {
546 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
547 g_assert (!workers_distribute_gray_queue.locked == !locked);
551 sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
552 sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
553 workers_distribute_gray_queue_inited = TRUE;
557 sgen_workers_init_distribute_gray_queue (void)
559 if (!collection_needs_workers ())
562 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
566 sgen_workers_init (int num_workers)
570 if (!sgen_get_major_collector ()->is_concurrent)
573 //g_print ("initing %d workers\n", num_workers);
575 workers_num = num_workers;
577 workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
578 memset (workers_data, 0, sizeof (WorkerData) * num_workers);
580 MONO_SEM_INIT (&workers_waiting_sem, 0);
581 MONO_SEM_INIT (&workers_done_sem, 0);
583 init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
585 if (sgen_get_major_collector ()->alloc_worker_data)
586 workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
588 for (i = 0; i < workers_num; ++i) {
589 workers_data [i].index = i;
591 /* private gray queue is inited by the thread itself */
592 mono_mutex_init (&workers_data [i].stealable_stack_mutex);
593 workers_data [i].stealable_stack_fill = 0;
595 if (sgen_get_major_collector ()->alloc_worker_data)
596 workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
599 LOCK_INIT (workers_job_queue_mutex);
601 sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
603 mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_lock);
604 mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_no_lock);
605 mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_others);
606 mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_waited);
609 /* only the GC thread is allowed to start and join workers */
612 workers_start_worker (int index)
614 g_assert (index >= 0 && index < workers_num);
616 g_assert (!workers_data [index].thread);
617 mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
621 sgen_workers_start_all_workers (void)
623 State old_state, new_state;
627 if (!collection_needs_workers ())
630 if (sgen_get_major_collector ()->init_worker_thread)
631 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
633 old_state = new_state = workers_state;
634 assert_not_working (old_state);
636 g_assert (workers_job_queue_num_entries == 0);
637 workers_num_jobs_enqueued = 0;
638 workers_num_jobs_finished = 0;
640 if (workers_started) {
641 workers_signal_enqueue_work (workers_num, FALSE);
645 new_state.data.state = STATE_WORKING;
646 new_state.data.num_awake = workers_num;
647 result = set_state (old_state, new_state);
648 SGEN_ASSERT (0, result, "Nobody else should have modified the state - workers have not been started yet");
650 for (i = 0; i < workers_num; ++i)
651 workers_start_worker (i);
653 workers_started = TRUE;
657 sgen_workers_have_started (void)
659 return workers_started;
663 sgen_workers_join (void)
668 if (!collection_needs_workers ())
672 old_state = workers_state;
673 SGEN_ASSERT (0, old_state.data.state != STATE_NURSERY_COLLECTION, "Can't be in nursery collection when joining");
675 if (old_state.data.state == STATE_WORKING) {
676 State new_state = old_state;
678 SGEN_ASSERT (0, !old_state.data.post_done, "Why is post_done already set?");
679 new_state.data.post_done = 1;
680 if (!set_state (old_state, new_state))
683 MONO_SEM_WAIT (&workers_done_sem);
685 old_state = workers_state;
688 assert_not_working (old_state);
691 * Checking whether there is still work left and, if not, going to sleep,
692 * are two separate actions that are not performed atomically by the
693 * workers. Therefore there's a race condition where work can be added
694 * after they've checked for work, and before they've gone to sleep.
696 if (!workers_job_queue_num_entries && sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
699 workers_signal_enqueue_work (workers_num, FALSE);
702 /* At this point all the workers have stopped. */
704 if (sgen_get_major_collector ()->reset_worker_data) {
705 for (i = 0; i < workers_num; ++i)
706 sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
709 g_assert (workers_job_queue_num_entries == 0);
710 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
711 for (i = 0; i < workers_num; ++i) {
712 g_assert (!workers_data [i].stealable_stack_fill);
713 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
718 sgen_workers_all_done (void)
720 return workers_state.data.state == STATE_NOT_WORKING;
724 sgen_workers_are_working (void)
726 State state = workers_state;
727 return state.data.num_awake > 0 || state.data.num_posted > 0;
731 sgen_is_worker_thread (MonoNativeThreadId thread)
735 if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
738 for (i = 0; i < workers_num; ++i) {
739 if (workers_data [i].thread == thread)
745 SgenSectionGrayQueue*
746 sgen_workers_get_distribute_section_gray_queue (void)
748 return &workers_distribute_gray_queue;
752 sgen_workers_reset_data (void)
754 if (sgen_get_major_collector ()->reset_worker_data)
755 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);