X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fsgen%2Fsgen-workers.c;h=cc1930bcc2f443a65b4c536208051e89a119e94f;hb=13ff81c713b778e361dee4315943e5fba13ca5a7;hp=76eebdd86edfe066f5bfbdf1c6c51d6ad562372e;hpb=fee5ece80556eb45d16da5f6c0fde991e5af1d40;p=mono.git diff --git a/mono/sgen/sgen-workers.c b/mono/sgen/sgen-workers.c index 76eebdd86ed..cc1930bcc2f 100644 --- a/mono/sgen/sgen-workers.c +++ b/mono/sgen/sgen-workers.c @@ -20,8 +20,20 @@ #include "mono/sgen/sgen-client.h" static int workers_num; +static int active_workers_num; static volatile gboolean forced_stop; static WorkerData *workers_data; +static SgenWorkerCallback worker_init_cb; + +/* + * When using multiple workers, we need to have the last worker + * enqueue the preclean jobs (if there are any). This lock ensures + * that when the last worker takes it, all the other workers have + * gracefully finished, so it can restart them. + */ +static mono_mutex_t finished_lock; +static volatile gboolean workers_finished; +static int worker_awakenings; static SgenSectionGrayQueue workers_distribute_gray_queue; static gboolean workers_distribute_gray_queue_inited; @@ -31,8 +43,8 @@ static gboolean workers_distribute_gray_queue_inited; * * | from \ to | NOT WORKING | WORKING | WORK ENQUEUED | * |--------------------+-------------+---------+---------------+ - * | NOT WORKING | - | - | main | - * | WORKING | worker | - | main | + * | NOT WORKING | - | - | main / worker | + * | WORKING | worker | - | main / worker | * | WORK ENQUEUED | - | worker | - | * * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at @@ -50,15 +62,18 @@ enum { typedef gint32 State; -static volatile State workers_state; - static SgenObjectOperations * volatile idle_func_object_ops; -static SgenThreadPoolJob * volatile preclean_job; +static SgenObjectOperations *idle_func_object_ops_par, *idle_func_object_ops_nopar; +/* + * finished_callback is called only when the workers finish work normally (when they + * are not forced to finish). The callback is used to enqueue preclean jobs. + */ +static volatile SgenWorkersFinishCallback finish_callback; static guint64 stat_workers_num_finished; static gboolean -set_state (State old_state, State new_state) +set_state (WorkerData *data, State old_state, State new_state) { SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?"); if (new_state == STATE_NOT_WORKING) @@ -68,7 +83,7 @@ set_state (State old_state, State new_state) if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING) SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING"); - return InterlockedCompareExchange (&workers_state, new_state, old_state) == old_state; + return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state; } static gboolean @@ -80,19 +95,36 @@ state_is_working_or_enqueued (State state) static void sgen_workers_ensure_awake (void) { - State old_state; - gboolean did_set_state; + int i; + gboolean need_signal = FALSE; - do { - old_state = workers_state; + /* + * All workers are awaken, make sure we reset the parallel context. + * We call this function only when starting the workers so nobody is running, + * or when the last worker is enqueuing preclean work. In both cases we can't + * have a worker working using a nopar context, which means it is safe. + */ + idle_func_object_ops = (active_workers_num > 1) ? idle_func_object_ops_par : idle_func_object_ops_nopar; + workers_finished = FALSE; - if (old_state == STATE_WORK_ENQUEUED) - break; + for (i = 0; i < active_workers_num; i++) { + State old_state; + gboolean did_set_state; + + do { + old_state = workers_data [i].state; + + if (old_state == STATE_WORK_ENQUEUED) + break; - did_set_state = set_state (old_state, STATE_WORK_ENQUEUED); - } while (!did_set_state); + did_set_state = set_state (&workers_data [i], old_state, STATE_WORK_ENQUEUED); + } while (!did_set_state); - if (!state_is_working_or_enqueued (old_state)) + if (!state_is_working_or_enqueued (old_state)) + need_signal = TRUE; + } + + if (need_signal) sgen_thread_pool_idle_signal (); } @@ -100,23 +132,60 @@ static void worker_try_finish (WorkerData *data) { State old_state; + int i, working = 0; ++stat_workers_num_finished; + mono_os_mutex_lock (&finished_lock); + + for (i = 0; i < active_workers_num; i++) { + if (state_is_working_or_enqueued (workers_data [i].state)) + working++; + } + + if (working == 1) { + SgenWorkersFinishCallback callback = finish_callback; + SGEN_ASSERT (0, idle_func_object_ops == idle_func_object_ops_nopar, "Why are we finishing with parallel context"); + /* We are the last one left. Enqueue preclean job if we have one and awake everybody */ + SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?"); + if (callback) { + finish_callback = NULL; + callback (); + worker_awakenings = 0; + /* Make sure each worker has a chance of seeing the enqueued jobs */ + sgen_workers_ensure_awake (); + SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED"); + goto work_available; + } + } + do { - old_state = workers_state; + old_state = data->state; SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?"); if (old_state == STATE_WORK_ENQUEUED) - return; + goto work_available; SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?"); + } while (!set_state (data, old_state, STATE_NOT_WORKING)); - /* We are the last thread to go to sleep. */ - } while (!set_state (old_state, STATE_NOT_WORKING)); + /* + * If we are second to last to finish, we set the scan context to the non-parallel + * version so we can speed up the last worker. This helps us maintain same level + * of performance as non-parallel mode even if we fail to distribute work properly. + */ + if (working == 2) + idle_func_object_ops = idle_func_object_ops_nopar; + + workers_finished = TRUE; + mono_os_mutex_unlock (&finished_lock); binary_protocol_worker_finish (sgen_timestamp (), forced_stop); sgen_gray_object_queue_trim_free_list (&data->private_gray_queue); + return; + +work_available: + mono_os_mutex_unlock (&finished_lock); } void @@ -131,20 +200,6 @@ sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue) sgen_thread_pool_job_enqueue (job); } -void -sgen_workers_wait_for_jobs_finished (void) -{ - sgen_thread_pool_wait_for_all_jobs (); - /* - * If the idle task was never triggered or it finished before the last job did and - * then didn't get triggered again, we might end up in the situation of having - * something in the gray queue yet the idle task not working. The easiest way to - * make sure this doesn't stay that way is to just trigger it again after all jobs - * have finished. - */ - sgen_workers_ensure_awake (); -} - static gboolean workers_get_work (WorkerData *data) { @@ -157,7 +212,7 @@ workers_get_work (WorkerData *data) if (major->is_concurrent) { GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue); if (section) { - sgen_gray_object_enqueue_section (&data->private_gray_queue, section); + sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel); return TRUE; } } @@ -167,6 +222,37 @@ workers_get_work (WorkerData *data) return FALSE; } +static gboolean +workers_steal_work (WorkerData *data) +{ + SgenMajorCollector *major = sgen_get_major_collector (); + GrayQueueSection *section = NULL; + int i, current_worker; + + if (!major->is_parallel) + return FALSE; + + /* If we're parallel, steal from other workers' private gray queues */ + g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); + + current_worker = (int) (data - workers_data); + + for (i = 1; i < active_workers_num && !section; i++) { + int steal_worker = (current_worker + i) % active_workers_num; + if (state_is_working_or_enqueued (workers_data [steal_worker].state)) + section = sgen_gray_object_steal_section (&workers_data [steal_worker].private_gray_queue); + } + + if (section) { + sgen_gray_object_enqueue_section (&data->private_gray_queue, section, TRUE); + return TRUE; + } + + /* Nobody to steal from */ + g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); + return FALSE; +} + static void concurrent_enqueue_check (GCObject *obj) { @@ -195,12 +281,30 @@ thread_pool_init_func (void *data_untyped) return; init_private_gray_queue (data); + + if (worker_init_cb) + worker_init_cb (data); +} + +static gboolean +continue_idle_func (void *data_untyped) +{ + if (data_untyped) { + WorkerData *data = (WorkerData *)data_untyped; + return state_is_working_or_enqueued (data->state); + } else { + /* Return if any of the threads is working */ + return !sgen_workers_all_done (); + } } static gboolean -continue_idle_func (void) +should_work_func (void *data_untyped) { - return state_is_working_or_enqueued (workers_state); + WorkerData *data = (WorkerData*)data_untyped; + int current_worker = (int) (data - workers_data); + + return current_worker < active_workers_num; } static void @@ -208,28 +312,28 @@ marker_idle_func (void *data_untyped) { WorkerData *data = (WorkerData *)data_untyped; - SGEN_ASSERT (0, continue_idle_func (), "Why are we called when we're not supposed to work?"); + SGEN_ASSERT (0, continue_idle_func (data_untyped), "Why are we called when we're not supposed to work?"); SGEN_ASSERT (0, sgen_concurrent_collection_in_progress (), "The worker should only mark in concurrent collections."); - if (workers_state == STATE_WORK_ENQUEUED) { - set_state (STATE_WORK_ENQUEUED, STATE_WORKING); - SGEN_ASSERT (0, workers_state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?"); + if (data->state == STATE_WORK_ENQUEUED) { + set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING); + SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?"); } - if (!forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) { + if (!forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) { ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (idle_func_object_ops, &data->private_gray_queue); SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?"); sgen_drain_gray_stack (ctx); - } else { - SgenThreadPoolJob *job = preclean_job; - if (job) { - sgen_thread_pool_job_enqueue (job); - preclean_job = NULL; - } else { - worker_try_finish (data); + + if (data->private_gray_queue.num_sections > 16 && workers_finished && worker_awakenings < active_workers_num) { + /* We bound the number of worker awakenings just to be sure */ + worker_awakenings++; + sgen_workers_ensure_awake (); } + } else { + worker_try_finish (data); } } @@ -256,29 +360,33 @@ sgen_workers_init_distribute_gray_queue (void) } void -sgen_workers_init (int num_workers) +sgen_workers_init (int num_workers, SgenWorkerCallback callback) { int i; void **workers_data_ptrs = (void **)alloca(num_workers * sizeof(void *)); if (!sgen_get_major_collector ()->is_concurrent) { - sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL); + sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL, NULL); return; } + mono_os_mutex_init (&finished_lock); //g_print ("initing %d workers\n", num_workers); workers_num = num_workers; + active_workers_num = num_workers; workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE); memset (workers_data, 0, sizeof (WorkerData) * num_workers); init_distribute_gray_queue (); - for (i = 0; i < workers_num; ++i) + for (i = 0; i < num_workers; ++i) workers_data_ptrs [i] = (void *) &workers_data [i]; - sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, workers_data_ptrs); + worker_init_cb = callback; + + sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, workers_data_ptrs); mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished); } @@ -286,21 +394,34 @@ sgen_workers_init (int num_workers) void sgen_workers_stop_all_workers (void) { - preclean_job = NULL; + finish_callback = NULL; mono_memory_write_barrier (); forced_stop = TRUE; sgen_thread_pool_wait_for_all_jobs (); sgen_thread_pool_idle_wait (); - SGEN_ASSERT (0, workers_state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state"); + SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); +} + +void +sgen_workers_set_num_active_workers (int num_workers) +{ + if (num_workers) { + SGEN_ASSERT (0, active_workers_num <= workers_num, "We can't start more workers than we initialized"); + active_workers_num = num_workers; + } else { + active_workers_num = workers_num; + } } void -sgen_workers_start_all_workers (SgenObjectOperations *object_ops, SgenThreadPoolJob *job) +sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback) { + idle_func_object_ops_par = object_ops_par; + idle_func_object_ops_nopar = object_ops_nopar; forced_stop = FALSE; - idle_func_object_ops = object_ops; - preclean_job = job; + finish_callback = callback; + worker_awakenings = 0; mono_memory_write_barrier (); sgen_workers_ensure_awake (); @@ -313,12 +434,12 @@ sgen_workers_join (void) sgen_thread_pool_wait_for_all_jobs (); sgen_thread_pool_idle_wait (); - SGEN_ASSERT (0, workers_state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state"); + SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); /* At this point all the workers have stopped. */ SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is there still work left to do?"); - for (i = 0; i < workers_num; ++i) + for (i = 0; i < active_workers_num; ++i) SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue), "Why is there still work left to do?"); } @@ -336,7 +457,7 @@ sgen_workers_have_idle_work (void) if (!sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) return TRUE; - for (i = 0; i < workers_num; ++i) { + for (i = 0; i < active_workers_num; ++i) { if (!sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue)) return TRUE; } @@ -347,14 +468,20 @@ sgen_workers_have_idle_work (void) gboolean sgen_workers_all_done (void) { - return workers_state == STATE_NOT_WORKING; + int i; + + for (i = 0; i < active_workers_num; i++) { + if (state_is_working_or_enqueued (workers_data [i].state)) + return FALSE; + } + return TRUE; } /* Must only be used for debugging */ gboolean sgen_workers_are_working (void) { - return state_is_working_or_enqueued (workers_state); + return !sgen_workers_all_done (); } void @@ -364,22 +491,45 @@ sgen_workers_assert_gray_queue_is_empty (void) } void -sgen_workers_take_from_queue_and_awake (SgenGrayQueue *queue) +sgen_workers_take_from_queue (SgenGrayQueue *queue) { - gboolean wake = FALSE; + sgen_gray_object_spread (queue, sgen_workers_get_job_split_count ()); for (;;) { GrayQueueSection *section = sgen_gray_object_dequeue_section (queue); if (!section) break; sgen_section_gray_queue_enqueue (&workers_distribute_gray_queue, section); - wake = TRUE; } - if (wake) { - SGEN_ASSERT (0, sgen_concurrent_collection_in_progress (), "Why is there work to take when there's no concurrent collection in progress?"); - sgen_workers_ensure_awake (); - } + SGEN_ASSERT (0, !sgen_workers_are_working (), "We should fully populate the distribute gray queue before we start the workers"); +} + +SgenObjectOperations* +sgen_workers_get_idle_func_object_ops (void) +{ + return (idle_func_object_ops_par) ? idle_func_object_ops_par : idle_func_object_ops_nopar; +} + +/* + * If we have a single worker, splitting into multiple jobs makes no sense. With + * more than one worker, we split into a larger number of jobs so that, in case + * the work load is uneven, a worker that finished quickly can take up more jobs + * than another one. + */ +int +sgen_workers_get_job_split_count (void) +{ + return (active_workers_num > 1) ? active_workers_num * 4 : 1; +} + +void +sgen_workers_foreach (SgenWorkerCallback callback) +{ + int i; + + for (i = 0; i < workers_num; i++) + callback (&workers_data [i]); } #endif