From fa9f7ce3a5070882306a20930fc7817f6da64604 Mon Sep 17 00:00:00 2001 From: Mark Probst Date: Thu, 10 Feb 2011 20:18:34 +0100 Subject: [PATCH] [sgen] Better work distribution for parallel mark. --- mono/metadata/sgen-gc.c | 5 +- mono/metadata/sgen-workers.c | 326 +++++++++++++++++++---------------- 2 files changed, 179 insertions(+), 152 deletions(-) diff --git a/mono/metadata/sgen-gc.c b/mono/metadata/sgen-gc.c index 1780cf2c63c..4652ab906c2 100644 --- a/mono/metadata/sgen-gc.c +++ b/mono/metadata/sgen-gc.c @@ -3263,7 +3263,7 @@ major_do_collection (const char *reason) major_collector.init_to_space (); - workers_start_all_workers (1); + workers_start_all_workers (); if (mono_profiler_get_events () & MONO_PROFILE_GC_ROOTS) report_registered_roots (); @@ -3301,10 +3301,9 @@ major_do_collection (const char *reason) if (major_collector.is_parallel) { while (!gray_object_queue_is_empty (WORKERS_DISTRIBUTE_GRAY_QUEUE)) { workers_distribute_gray_queue_sections (); - usleep (2000); + usleep (1000); } } - workers_change_num_working (-1); workers_join (); if (major_collector.is_parallel) diff --git a/mono/metadata/sgen-workers.c b/mono/metadata/sgen-workers.c index 8475a58ba7f..4d574ddcb7a 100644 --- a/mono/metadata/sgen-workers.c +++ b/mono/metadata/sgen-workers.c @@ -22,140 +22,190 @@ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +#define STEALABLE_STACK_SIZE 512 + typedef struct _WorkerData WorkerData; struct _WorkerData { pthread_t thread; MonoSemType start_worker_sem; gboolean is_working; GrayQueue private_gray_queue; /* only read/written by worker thread */ - int shared_buffer_increment; - int shared_buffer_index; + + pthread_mutex_t stealable_stack_mutex; + volatile int stealable_stack_fill; + char *stealable_stack [STEALABLE_STACK_SIZE]; }; static int workers_num; static WorkerData *workers_data; static WorkerData workers_gc_thread_data; -static int workers_num_working; - static GrayQueue workers_distribute_gray_queue; #define WORKERS_DISTRIBUTE_GRAY_QUEUE (major_collector.is_parallel ? &workers_distribute_gray_queue : &gray_queue) -/* - * Must be a power of 2. It seems that larger values don't help much. - * The main reason to make this larger would be to sustain a bigger - * number of worker threads. - */ -#define WORKERS_SHARED_BUFFER_SIZE 16 -static GrayQueueSection *workers_shared_buffer [WORKERS_SHARED_BUFFER_SIZE]; -static int workers_shared_buffer_used; +static volatile gboolean workers_gc_in_progress = FALSE; +static gboolean workers_started = FALSE; +static volatile int workers_num_waiting = 0; +static MonoSemType workers_waiting_sem; +static MonoSemType workers_done_sem; +static volatile int workers_done_posted = 0; -static const int workers_primes [] = { 3, 5, 7, 11, 13, 17, 23, 29 }; +static long long stat_workers_stolen_from_self; +static long long stat_workers_stolen_from_others; +static long long stat_workers_num_waited; -static MonoSemType workers_done_sem; +static void +workers_wake_up_all (void) +{ + int max; + int i; -static long long stat_shared_buffer_insert_tries; -static long long stat_shared_buffer_insert_full; -static long long stat_shared_buffer_insert_iterations; -static long long stat_shared_buffer_insert_failures; -static long long stat_shared_buffer_remove_tries; -static long long stat_shared_buffer_remove_iterations; -static long long stat_shared_buffer_remove_empty; + max = workers_num_waiting; + for (i = 0; i < max; ++i) { + int num; + do { + num = workers_num_waiting; + if (num == 0) + return; + } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num); + MONO_SEM_POST (&workers_waiting_sem); + } +} + +static void +workers_wait (void) +{ + int num; + ++stat_workers_num_waited; + do { + num = workers_num_waiting; + } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num); + if (num + 1 == workers_num && !workers_gc_in_progress) { + /* Make sure the done semaphore is only posted once. */ + int posted; + do { + posted = workers_done_posted; + if (posted) + break; + } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0); + if (!posted) + MONO_SEM_POST (&workers_done_sem); + } + MONO_SEM_WAIT (&workers_waiting_sem); +} static void workers_gray_queue_share_redirect (GrayQueue *queue) { GrayQueueSection *section; WorkerData *data = queue->alloc_prepare_data; - int increment = data->shared_buffer_increment; - while ((section = gray_object_dequeue_section (queue))) { - int i, index; + if (data->stealable_stack_fill) { + /* + * There are still objects in the stealable stack, so + * wake up any workers that might be sleeping + */ + if (workers_gc_in_progress) + workers_wake_up_all (); + return; + } - HEAVY_STAT (++stat_shared_buffer_insert_tries); + /* The stealable stack is empty, so fill it. */ + pthread_mutex_lock (&data->stealable_stack_mutex); - if (workers_shared_buffer_used == WORKERS_SHARED_BUFFER_SIZE) { - HEAVY_STAT (++stat_shared_buffer_insert_full); - gray_object_enqueue_section (queue, section); - return; - } + while (data->stealable_stack_fill < STEALABLE_STACK_SIZE && + (section = gray_object_dequeue_section (queue))) { + int num = MIN (section->end, STEALABLE_STACK_SIZE - data->stealable_stack_fill); - index = data->shared_buffer_index; - for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) { - GrayQueueSection *old = workers_shared_buffer [index]; - HEAVY_STAT (++stat_shared_buffer_insert_iterations); - if (!old) { - if (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], section, NULL) == NULL) { - SGEN_ATOMIC_ADD (workers_shared_buffer_used, 1); - //g_print ("thread %d put section %d\n", data - workers_data, index); - break; - } - } - index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1); - } - data->shared_buffer_index = index; + memcpy (data->stealable_stack + data->stealable_stack_fill, + section->objects + section->end - num, + sizeof (char*) * num); - if (i == WORKERS_SHARED_BUFFER_SIZE) { - /* unsuccessful */ - HEAVY_STAT (++stat_shared_buffer_insert_failures); + section->end -= num; + data->stealable_stack_fill += num; + + if (section->end) gray_object_enqueue_section (queue, section); - return; - } + else + gray_object_free_queue_section (section, queue->allocator); } + + pthread_mutex_unlock (&data->stealable_stack_mutex); + + if (workers_gc_in_progress) + workers_wake_up_all (); } static gboolean -workers_get_work (WorkerData *data) +workers_steal (WorkerData *data, WorkerData *victim_data) { - int i, index; - int increment = data->shared_buffer_increment; + GrayQueue *queue = &data->private_gray_queue; + int num, n; - HEAVY_STAT (++stat_shared_buffer_remove_tries); + g_assert (!queue->first); - index = data->shared_buffer_index; - for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) { - GrayQueueSection *section; + if (!victim_data->stealable_stack_fill) + return FALSE; - HEAVY_STAT (++stat_shared_buffer_remove_iterations); + if (pthread_mutex_trylock (&victim_data->stealable_stack_mutex)) + return FALSE; - do { - section = workers_shared_buffer [index]; - if (!section) - break; - } while (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], NULL, section) != section); + n = num = (victim_data->stealable_stack_fill + 1) / 2; + /* We're stealing num entries. */ - if (section) { - SGEN_ATOMIC_ADD (workers_shared_buffer_used, -1); - gray_object_enqueue_section (&data->private_gray_queue, section); - data->shared_buffer_index = index; - //g_print ("thread %d popped section %d\n", data - workers_data, index); - return TRUE; - } + while (n > 0) { + int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n); + n -= m; - index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1); + gray_object_alloc_queue_section (queue); + memcpy (queue->first->objects, + victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n, + sizeof (char*) * m); + queue->first->end = m; } - HEAVY_STAT (++stat_shared_buffer_remove_empty); + victim_data->stealable_stack_fill -= num; - data->shared_buffer_index = index; - return FALSE; + pthread_mutex_unlock (&victim_data->stealable_stack_mutex); + + if (data == victim_data) + stat_workers_stolen_from_self += num; + else + stat_workers_stolen_from_others += num; + + return num != 0; } -/* returns the new value */ -static int -workers_change_num_working (int delta) +static gboolean +workers_get_work (WorkerData *data) { - int old, new; + g_assert (gray_object_queue_is_empty (&data->private_gray_queue)); - if (!major_collector.is_parallel) - return -1; + for (;;) { + int i; - do { - old = workers_num_working; - new = old + delta; - } while (InterlockedCompareExchange (&workers_num_working, new, old) != old); - return new; + /* Try to steal from our own stack. */ + if (workers_steal (data, data)) + return TRUE; + + /* Then from the GC thread's stack. */ + if (workers_steal (data, &workers_gc_thread_data)) + return TRUE; + + /* Finally, from another worker. */ + for (i = 0; i < workers_num; ++i) { + WorkerData *victim_data = &workers_data [i]; + if (data == victim_data) + continue; + if (workers_steal (data, victim_data)) + return TRUE; + } + + /* Nobody to steal from, so wait. */ + g_assert (gray_object_queue_is_empty (&data->private_gray_queue)); + workers_wait (); + } } static void* @@ -170,36 +220,14 @@ workers_thread_func (void *data_untyped) workers_gray_queue_share_redirect, data); for (;;) { - //g_print ("worker waiting for start %d\n", data->start_worker_sem); + gboolean got_work = workers_get_work (data); + g_assert (got_work); + g_assert (!gray_object_queue_is_empty (&data->private_gray_queue)); - MONO_SEM_WAIT (&data->start_worker_sem); - - //g_print ("worker starting\n"); - - for (;;) { - do { - drain_gray_stack (&data->private_gray_queue); - } while (workers_get_work (data)); - - /* - * FIXME: This might never terminate with - * multiple threads! - */ - - if (workers_change_num_working (-1) == 0) - break; - - /* we weren't the last one working */ - //g_print ("sleeping\n"); - usleep (5000); - workers_change_num_working (1); - } + drain_gray_stack (&data->private_gray_queue); + g_assert (gray_object_queue_is_empty (&data->private_gray_queue)); gray_object_queue_init (&data->private_gray_queue, &allocator); - - MONO_SEM_POST (&workers_done_sem); - - //g_print ("worker done\n"); } /* dummy return to make compilers happy */ @@ -227,25 +255,23 @@ workers_init (int num_workers) workers_num = num_workers; workers_data = mono_sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA); + MONO_SEM_INIT (&workers_waiting_sem, 0); MONO_SEM_INIT (&workers_done_sem, 0); - workers_gc_thread_data.shared_buffer_increment = 1; - workers_gc_thread_data.shared_buffer_index = 0; + gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue, mono_sgen_get_unmanaged_allocator (), workers_gray_queue_share_redirect, &workers_gc_thread_data); + pthread_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL); + workers_gc_thread_data.stealable_stack_fill = 0; - g_assert (num_workers <= sizeof (workers_primes) / sizeof (workers_primes [0])); for (i = 0; i < workers_num; ++i) { - workers_data [i].shared_buffer_increment = workers_primes [i]; - workers_data [i].shared_buffer_index = 0; + /* private gray queue is inited by the thread itself */ + pthread_mutex_init (&workers_data [i].stealable_stack_mutex, NULL); + workers_data [i].stealable_stack_fill = 0; } - mono_counters_register ("Shared buffer insert tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_tries); - mono_counters_register ("Shared buffer insert full", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_full); - mono_counters_register ("Shared buffer insert iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_iterations); - mono_counters_register ("Shared buffer insert failures", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_failures); - mono_counters_register ("Shared buffer remove tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_tries); - mono_counters_register ("Shared buffer remove iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_iterations); - mono_counters_register ("Shared buffer remove empty", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_empty); + mono_counters_register ("Stolen from self", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self); + mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others); + mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited); } /* only the GC thread is allowed to start and join workers */ @@ -255,33 +281,32 @@ workers_start_worker (int index) { g_assert (index >= 0 && index < workers_num); - if (workers_data [index].is_working) - return; - - if (!workers_data [index].thread) { - //g_print ("initing thread %d\n", index); - MONO_SEM_INIT (&workers_data [index].start_worker_sem, 0); - pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]); - } - - workers_data [index].is_working = TRUE; - MONO_SEM_POST (&workers_data [index].start_worker_sem); - //g_print ("posted thread start %d %d\n", index, workers_data [index].start_worker_sem); + g_assert (!workers_data [index].thread); + pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]); } static void -workers_start_all_workers (int num_additional_workers) +workers_start_all_workers (void) { int i; if (!major_collector.is_parallel) return; - g_assert (workers_num_working == 0); - workers_num_working = workers_num + num_additional_workers; + g_assert (!workers_gc_in_progress); + workers_gc_in_progress = TRUE; + workers_done_posted = 0; + + if (workers_started) { + g_assert (workers_num_waiting == workers_num); + workers_wake_up_all (); + return; + } for (i = 0; i < workers_num; ++i) workers_start_worker (i); + + workers_started = TRUE; } static void @@ -292,20 +317,23 @@ workers_join (void) if (!major_collector.is_parallel) return; - //g_print ("joining\n"); + g_assert (gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue)); + g_assert (gray_object_queue_is_empty (&workers_distribute_gray_queue)); + + g_assert (workers_gc_in_progress); + workers_gc_in_progress = FALSE; + if (workers_num_waiting == workers_num) + workers_wake_up_all (); + MONO_SEM_WAIT (&workers_done_sem); + + g_assert (workers_done_posted); + g_assert (workers_num_waiting == workers_num); for (i = 0; i < workers_num; ++i) { - if (workers_data [i].is_working) - MONO_SEM_WAIT (&workers_done_sem); + g_assert (!workers_data [i].stealable_stack_fill); + g_assert (gray_object_queue_is_empty (&workers_data [i].private_gray_queue)); } - for (i = 0; i < workers_num; ++i) - workers_data [i].is_working = FALSE; - //g_print ("joined\n"); - - g_assert (workers_num_working == 0); - g_assert (workers_shared_buffer_used == 0); - for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) - g_assert (!workers_shared_buffer [i]); + g_assert (!workers_gc_thread_data.stealable_stack_fill); } gboolean -- 2.25.1