From f87454a5fe0cd1b404d1161ae20734e245c735ef Mon Sep 17 00:00:00 2001 From: Vlad Brezae Date: Tue, 17 May 2016 20:24:06 +0300 Subject: [PATCH] [sgen] Support multiple workers --- mono/sgen/sgen-thread-pool.c | 12 ++-- mono/sgen/sgen-thread-pool.h | 2 +- mono/sgen/sgen-workers.c | 120 ++++++++++++++++++++++++----------- mono/sgen/sgen-workers.h | 1 + 4 files changed, 91 insertions(+), 44 deletions(-) diff --git a/mono/sgen/sgen-thread-pool.c b/mono/sgen/sgen-thread-pool.c index 0ea06282706..62ad3ec480b 100644 --- a/mono/sgen/sgen-thread-pool.c +++ b/mono/sgen/sgen-thread-pool.c @@ -81,11 +81,11 @@ remove_job (SgenThreadPoolJob *job) } static gboolean -continue_idle_job (void) +continue_idle_job (void *thread_data) { if (!continue_idle_job_func) return FALSE; - return continue_idle_job_func (); + return continue_idle_job_func (thread_data); } static mono_native_thread_return_t @@ -101,7 +101,7 @@ thread_func (void *thread_data) * main thread might then set continue idle and signal us before we can take * the lock, and we'd lose the signal. */ - gboolean do_idle = continue_idle_job (); + gboolean do_idle = continue_idle_job (thread_data); SgenThreadPoolJob *job = get_job_and_set_in_progress (); if (!job && !do_idle && !threadpool_shutdown) { @@ -133,7 +133,7 @@ thread_func (void *thread_data) SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?"); do { idle_job_func (thread_data); - do_idle = continue_idle_job (); + do_idle = continue_idle_job (thread_data); } while (do_idle && !job_queue.next_slot); mono_os_mutex_lock (&lock); @@ -238,7 +238,7 @@ sgen_thread_pool_idle_signal (void) mono_os_mutex_lock (&lock); - if (continue_idle_job_func ()) + if (continue_idle_job_func (NULL)) mono_os_cond_broadcast (&work_cond); mono_os_mutex_unlock (&lock); @@ -251,7 +251,7 @@ sgen_thread_pool_idle_wait (void) mono_os_mutex_lock (&lock); - while (continue_idle_job_func ()) + while (continue_idle_job_func (NULL)) mono_os_cond_wait (&done_cond, &lock); mono_os_mutex_unlock (&lock); diff --git a/mono/sgen/sgen-thread-pool.h b/mono/sgen/sgen-thread-pool.h index 339526ca598..99bf7f6711f 100644 --- a/mono/sgen/sgen-thread-pool.h +++ b/mono/sgen/sgen-thread-pool.h @@ -22,7 +22,7 @@ struct _SgenThreadPoolJob { typedef void (*SgenThreadPoolThreadInitFunc) (void*); typedef void (*SgenThreadPoolIdleJobFunc) (void*); -typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void); +typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*); void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas); diff --git a/mono/sgen/sgen-workers.c b/mono/sgen/sgen-workers.c index 76eebdd86ed..f884d79a1d6 100644 --- a/mono/sgen/sgen-workers.c +++ b/mono/sgen/sgen-workers.c @@ -23,6 +23,14 @@ static int workers_num; static volatile gboolean forced_stop; static WorkerData *workers_data; +/* + * When using multiple workers, we need to have the last worker + * enqueue the preclean jobs (if there are any). This lock ensures + * that when the last worker takes it, all the other workers have + * gracefully finished, so it can restart them. + */ +static mono_mutex_t finished_lock; + static SgenSectionGrayQueue workers_distribute_gray_queue; static gboolean workers_distribute_gray_queue_inited; @@ -31,8 +39,8 @@ static gboolean workers_distribute_gray_queue_inited; * * | from \ to | NOT WORKING | WORKING | WORK ENQUEUED | * |--------------------+-------------+---------+---------------+ - * | NOT WORKING | - | - | main | - * | WORKING | worker | - | main | + * | NOT WORKING | - | - | main / worker | + * | WORKING | worker | - | main / worker | * | WORK ENQUEUED | - | worker | - | * * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at @@ -50,15 +58,13 @@ enum { typedef gint32 State; -static volatile State workers_state; - static SgenObjectOperations * volatile idle_func_object_ops; static SgenThreadPoolJob * volatile preclean_job; static guint64 stat_workers_num_finished; static gboolean -set_state (State old_state, State new_state) +set_state (WorkerData *data, State old_state, State new_state) { SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?"); if (new_state == STATE_NOT_WORKING) @@ -68,7 +74,7 @@ set_state (State old_state, State new_state) if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING) SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING"); - return InterlockedCompareExchange (&workers_state, new_state, old_state) == old_state; + return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state; } static gboolean @@ -80,19 +86,27 @@ state_is_working_or_enqueued (State state) static void sgen_workers_ensure_awake (void) { - State old_state; - gboolean did_set_state; + int i; + gboolean need_signal = FALSE; - do { - old_state = workers_state; + for (i = 0; i < workers_num; i++) { + State old_state; + gboolean did_set_state; - if (old_state == STATE_WORK_ENQUEUED) - break; + do { + old_state = workers_data [i].state; - did_set_state = set_state (old_state, STATE_WORK_ENQUEUED); - } while (!did_set_state); + if (old_state == STATE_WORK_ENQUEUED) + break; - if (!state_is_working_or_enqueued (old_state)) + did_set_state = set_state (&workers_data [i], old_state, STATE_WORK_ENQUEUED); + } while (!did_set_state); + + if (!state_is_working_or_enqueued (old_state)) + need_signal = TRUE; + } + + if (need_signal) sgen_thread_pool_idle_signal (); } @@ -100,23 +114,48 @@ static void worker_try_finish (WorkerData *data) { State old_state; + int i, working = 0; ++stat_workers_num_finished; + mono_os_mutex_lock (&finished_lock); + + for (i = 0; i < workers_num; i++) { + if (state_is_working_or_enqueued (workers_data [i].state)) + working++; + } + + if (working == 1) { + SgenThreadPoolJob *job = preclean_job; + /* We are the last one left. Enqueue preclean job if we have one and awake everybody */ + SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?"); + if (job) { + preclean_job = NULL; + sgen_thread_pool_job_enqueue (job); + sgen_workers_ensure_awake (); + SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED"); + goto work_available; + } + } + do { - old_state = workers_state; + old_state = data->state; SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?"); if (old_state == STATE_WORK_ENQUEUED) - return; + goto work_available; SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?"); + } while (!set_state (data, old_state, STATE_NOT_WORKING)); - /* We are the last thread to go to sleep. */ - } while (!set_state (old_state, STATE_NOT_WORKING)); + mono_os_mutex_unlock (&finished_lock); binary_protocol_worker_finish (sgen_timestamp (), forced_stop); sgen_gray_object_queue_trim_free_list (&data->private_gray_queue); + return; + +work_available: + mono_os_mutex_unlock (&finished_lock); } void @@ -198,9 +237,15 @@ thread_pool_init_func (void *data_untyped) } static gboolean -continue_idle_func (void) +continue_idle_func (void *data_untyped) { - return state_is_working_or_enqueued (workers_state); + if (data_untyped) { + WorkerData *data = (WorkerData *)data_untyped; + return state_is_working_or_enqueued (data->state); + } else { + /* Return if any of the threads is working */ + return !sgen_workers_all_done (); + } } static void @@ -208,12 +253,12 @@ marker_idle_func (void *data_untyped) { WorkerData *data = (WorkerData *)data_untyped; - SGEN_ASSERT (0, continue_idle_func (), "Why are we called when we're not supposed to work?"); + SGEN_ASSERT (0, continue_idle_func (data_untyped), "Why are we called when we're not supposed to work?"); SGEN_ASSERT (0, sgen_concurrent_collection_in_progress (), "The worker should only mark in concurrent collections."); - if (workers_state == STATE_WORK_ENQUEUED) { - set_state (STATE_WORK_ENQUEUED, STATE_WORKING); - SGEN_ASSERT (0, workers_state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?"); + if (data->state == STATE_WORK_ENQUEUED) { + set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING); + SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?"); } if (!forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) { @@ -223,13 +268,7 @@ marker_idle_func (void *data_untyped) sgen_drain_gray_stack (ctx); } else { - SgenThreadPoolJob *job = preclean_job; - if (job) { - sgen_thread_pool_job_enqueue (job); - preclean_job = NULL; - } else { - worker_try_finish (data); - } + worker_try_finish (data); } } @@ -266,6 +305,7 @@ sgen_workers_init (int num_workers) return; } + mono_os_mutex_init (&finished_lock); //g_print ("initing %d workers\n", num_workers); workers_num = num_workers; @@ -275,7 +315,7 @@ sgen_workers_init (int num_workers) init_distribute_gray_queue (); - for (i = 0; i < workers_num; ++i) + for (i = 0; i < num_workers; ++i) workers_data_ptrs [i] = (void *) &workers_data [i]; sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, workers_data_ptrs); @@ -292,7 +332,7 @@ sgen_workers_stop_all_workers (void) sgen_thread_pool_wait_for_all_jobs (); sgen_thread_pool_idle_wait (); - SGEN_ASSERT (0, workers_state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state"); + SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); } void @@ -313,7 +353,7 @@ sgen_workers_join (void) sgen_thread_pool_wait_for_all_jobs (); sgen_thread_pool_idle_wait (); - SGEN_ASSERT (0, workers_state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state"); + SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); /* At this point all the workers have stopped. */ @@ -347,14 +387,20 @@ sgen_workers_have_idle_work (void) gboolean sgen_workers_all_done (void) { - return workers_state == STATE_NOT_WORKING; + int i; + + for (i = 0; i < workers_num; i++) { + if (state_is_working_or_enqueued (workers_data [i].state)) + return FALSE; + } + return TRUE; } /* Must only be used for debugging */ gboolean sgen_workers_are_working (void) { - return state_is_working_or_enqueued (workers_state); + return !sgen_workers_all_done (); } void diff --git a/mono/sgen/sgen-workers.h b/mono/sgen/sgen-workers.h index 1a66c79a48c..f13444691e6 100644 --- a/mono/sgen/sgen-workers.h +++ b/mono/sgen/sgen-workers.h @@ -14,6 +14,7 @@ typedef struct _WorkerData WorkerData; struct _WorkerData { + gint32 state; SgenGrayQueue private_gray_queue; /* only read/written by worker thread */ }; -- 2.25.1