We have the infrastructure to use multiple concurrent workers, but, in theory, this could starve the mutator threads. In the future, we should do additional research and consider using multiple concurrent workers.
SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done when we start or finish a major collection?");
if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done when we start or finish a major collection?");
if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
+ sgen_workers_set_num_active_workers (0);
if (sgen_workers_have_idle_work ()) {
/*
* We force the finish of the worker with the new object ops context
if (sgen_workers_have_idle_work ()) {
/*
* We force the finish of the worker with the new object ops context
* the roots.
*/
if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT) {
* the roots.
*/
if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT) {
+ sgen_workers_set_num_active_workers (1);
gray_queue_redirect (gc_thread_gray_queue);
if (precleaning_enabled) {
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, workers_finish_callback);
gray_queue_redirect (gc_thread_gray_queue);
if (precleaning_enabled) {
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, workers_finish_callback);
static SgenThreadPoolThreadInitFunc thread_init_func;
static SgenThreadPoolIdleJobFunc idle_job_func;
static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
static SgenThreadPoolThreadInitFunc thread_init_func;
static SgenThreadPoolIdleJobFunc idle_job_func;
static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
+static SgenThreadPoolShouldWorkFunc should_work_func;
static volatile gboolean threadpool_shutdown;
static volatile int threads_finished = 0;
static volatile gboolean threadpool_shutdown;
static volatile int threads_finished = 0;
return continue_idle_job_func (thread_data);
}
return continue_idle_job_func (thread_data);
}
+static gboolean
+should_work (void *thread_data)
+{
+ if (!should_work_func)
+ return TRUE;
+ return should_work_func (thread_data);
+}
+
static mono_native_thread_return_t
thread_func (void *thread_data)
{
static mono_native_thread_return_t
thread_func (void *thread_data)
{
mono_os_mutex_lock (&lock);
for (;;) {
mono_os_mutex_lock (&lock);
for (;;) {
+ gboolean do_idle;
+ SgenThreadPoolJob *job;
+
+ if (!should_work (thread_data)) {
+ mono_os_cond_wait (&work_cond, &lock);
+ continue;
+ }
/*
* It's important that we check the continue idle flag with the lock held.
* Suppose we didn't check with the lock held, and the result is FALSE. The
* main thread might then set continue idle and signal us before we can take
* the lock, and we'd lose the signal.
*/
/*
* It's important that we check the continue idle flag with the lock held.
* Suppose we didn't check with the lock held, and the result is FALSE. The
* main thread might then set continue idle and signal us before we can take
* the lock, and we'd lose the signal.
*/
- gboolean do_idle = continue_idle_job (thread_data);
- SgenThreadPoolJob *job = get_job_and_set_in_progress ();
+ do_idle = continue_idle_job (thread_data);
+ job = get_job_and_set_in_progress ();
if (!job && !do_idle && !threadpool_shutdown) {
/*
if (!job && !do_idle && !threadpool_shutdown) {
/*
-sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas)
+sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas)
thread_init_func = init_func;
idle_job_func = idle_func;
continue_idle_job_func = continue_idle_func;
thread_init_func = init_func;
idle_job_func = idle_func;
continue_idle_job_func = continue_idle_func;
+ should_work_func = should_work_func_p;
for (i = 0; i < threads_num; i++)
mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
for (i = 0; i < threads_num; i++)
mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
typedef void (*SgenThreadPoolThreadInitFunc) (void*);
typedef void (*SgenThreadPoolIdleJobFunc) (void*);
typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
typedef void (*SgenThreadPoolThreadInitFunc) (void*);
typedef void (*SgenThreadPoolIdleJobFunc) (void*);
typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
+typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*);
-void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas);
+void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas);
void sgen_thread_pool_shutdown (void);
void sgen_thread_pool_shutdown (void);
#include "mono/sgen/sgen-client.h"
static int workers_num;
#include "mono/sgen/sgen-client.h"
static int workers_num;
+static int active_workers_num;
static volatile gboolean forced_stop;
static WorkerData *workers_data;
static SgenWorkerCallback worker_init_cb;
static volatile gboolean forced_stop;
static WorkerData *workers_data;
static SgenWorkerCallback worker_init_cb;
* or when the last worker is enqueuing preclean work. In both cases we can't
* have a worker working using a nopar context, which means it is safe.
*/
* or when the last worker is enqueuing preclean work. In both cases we can't
* have a worker working using a nopar context, which means it is safe.
*/
- idle_func_object_ops = (workers_num > 1) ? idle_func_object_ops_par : idle_func_object_ops_nopar;
+ idle_func_object_ops = (active_workers_num > 1) ? idle_func_object_ops_par : idle_func_object_ops_nopar;
workers_finished = FALSE;
workers_finished = FALSE;
- for (i = 0; i < workers_num; i++) {
+ for (i = 0; i < active_workers_num; i++) {
State old_state;
gboolean did_set_state;
State old_state;
gboolean did_set_state;
mono_os_mutex_lock (&finished_lock);
mono_os_mutex_lock (&finished_lock);
- for (i = 0; i < workers_num; i++) {
+ for (i = 0; i < active_workers_num; i++) {
if (state_is_working_or_enqueued (workers_data [i].state))
working++;
}
if (state_is_working_or_enqueued (workers_data [i].state))
working++;
}
current_worker = (int) (data - workers_data);
current_worker = (int) (data - workers_data);
- for (i = 1; i < workers_num && !section; i++) {
- int steal_worker = (current_worker + i) % workers_num;
+ for (i = 1; i < active_workers_num && !section; i++) {
+ int steal_worker = (current_worker + i) % active_workers_num;
if (state_is_working_or_enqueued (workers_data [steal_worker].state))
section = sgen_gray_object_steal_section (&workers_data [steal_worker].private_gray_queue);
}
if (state_is_working_or_enqueued (workers_data [steal_worker].state))
section = sgen_gray_object_steal_section (&workers_data [steal_worker].private_gray_queue);
}
+static gboolean
+should_work_func (void *data_untyped)
+{
+ WorkerData *data = (WorkerData*)data_untyped;
+ int current_worker = (int) (data - workers_data);
+
+ return current_worker < active_workers_num;
+}
+
static void
marker_idle_func (void *data_untyped)
{
static void
marker_idle_func (void *data_untyped)
{
sgen_drain_gray_stack (ctx);
sgen_drain_gray_stack (ctx);
- if (data->private_gray_queue.num_sections > 16 && workers_finished && worker_awakenings < workers_num) {
+ if (data->private_gray_queue.num_sections > 16 && workers_finished && worker_awakenings < active_workers_num) {
/* We bound the number of worker awakenings just to be sure */
worker_awakenings++;
sgen_workers_ensure_awake ();
/* We bound the number of worker awakenings just to be sure */
worker_awakenings++;
sgen_workers_ensure_awake ();
void **workers_data_ptrs = (void **)alloca(num_workers * sizeof(void *));
if (!sgen_get_major_collector ()->is_concurrent) {
void **workers_data_ptrs = (void **)alloca(num_workers * sizeof(void *));
if (!sgen_get_major_collector ()->is_concurrent) {
- sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL);
+ sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL, NULL);
//g_print ("initing %d workers\n", num_workers);
workers_num = num_workers;
//g_print ("initing %d workers\n", num_workers);
workers_num = num_workers;
+ active_workers_num = num_workers;
workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
memset (workers_data, 0, sizeof (WorkerData) * num_workers);
workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
memset (workers_data, 0, sizeof (WorkerData) * num_workers);
worker_init_cb = callback;
worker_init_cb = callback;
- sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, workers_data_ptrs);
+ sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, workers_data_ptrs);
mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
}
mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
}
SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
}
SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
}
+void
+sgen_workers_set_num_active_workers (int num_workers)
+{
+ if (num_workers) {
+ SGEN_ASSERT (0, active_workers_num <= workers_num, "We can't start more workers than we initialized");
+ active_workers_num = num_workers;
+ } else {
+ active_workers_num = workers_num;
+ }
+}
+
void
sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
{
void
sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
{
/* At this point all the workers have stopped. */
SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is there still work left to do?");
/* At this point all the workers have stopped. */
SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is there still work left to do?");
- for (i = 0; i < workers_num; ++i)
+ for (i = 0; i < active_workers_num; ++i)
SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue), "Why is there still work left to do?");
}
SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue), "Why is there still work left to do?");
}
if (!sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
return TRUE;
if (!sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
return TRUE;
- for (i = 0; i < workers_num; ++i) {
+ for (i = 0; i < active_workers_num; ++i) {
if (!sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue))
return TRUE;
}
if (!sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue))
return TRUE;
}
- for (i = 0; i < workers_num; i++) {
+ for (i = 0; i < active_workers_num; i++) {
if (state_is_working_or_enqueued (workers_data [i].state))
return FALSE;
}
if (state_is_working_or_enqueued (workers_data [i].state))
return FALSE;
}
int
sgen_workers_get_job_split_count (void)
{
int
sgen_workers_get_job_split_count (void)
{
- return (workers_num > 1) ? workers_num * 4 : 1;
+ return (active_workers_num > 1) ? active_workers_num * 4 : 1;
void sgen_workers_init (int num_workers, SgenWorkerCallback callback);
void sgen_workers_stop_all_workers (void);
void sgen_workers_init (int num_workers, SgenWorkerCallback callback);
void sgen_workers_stop_all_workers (void);
+void sgen_workers_set_num_active_workers (int num_workers);
void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
void sgen_workers_init_distribute_gray_queue (void);
void sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue);
void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
void sgen_workers_init_distribute_gray_queue (void);
void sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue);