/** * \file * Threadpool for all concurrent GC work. * * Copyright (C) 2015 Xamarin Inc * * Licensed under the MIT license. See LICENSE file in the project root for full license information. */ #include "config.h" #ifdef HAVE_SGEN_GC #include "mono/sgen/sgen-gc.h" #include "mono/sgen/sgen-thread-pool.h" #include "mono/sgen/sgen-client.h" #include "mono/utils/mono-os-mutex.h" static mono_mutex_t lock; static mono_cond_t work_cond; static mono_cond_t done_cond; static int threads_num; static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS]; static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS]; static volatile gboolean threadpool_shutdown; static volatile int threads_finished; static int contexts_num; static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS]; enum { STATE_WAITING, STATE_IN_PROGRESS, STATE_DONE }; /* Assumes that the lock is held. */ static SgenThreadPoolJob* get_job_and_set_in_progress (SgenThreadPoolContext *context) { for (size_t i = 0; i < context->job_queue.next_slot; ++i) { SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i]; if (job->state == STATE_WAITING) { job->state = STATE_IN_PROGRESS; return job; } } return NULL; } /* Assumes that the lock is held. */ static ssize_t find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job) { for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) { if (context->job_queue.data [i] == job) return i; } return -1; } /* Assumes that the lock is held. */ static void remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job) { ssize_t index; SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?"); index = find_job_in_queue (context, job); SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?"); context->job_queue.data [index] = NULL; sgen_pointer_queue_remove_nulls (&context->job_queue); sgen_thread_pool_job_free (job); } static gboolean continue_idle_job (SgenThreadPoolContext *context, void *thread_data) { if (!context->continue_idle_job_func) return FALSE; return context->continue_idle_job_func (thread_data, context - pool_contexts); } static gboolean should_work (SgenThreadPoolContext *context, void *thread_data) { if (!context->should_work_func) return TRUE; return context->should_work_func (thread_data); } /* * Tells whether we should lock and attempt to get work from * a higher priority context. */ static gboolean has_priority_work (int worker_index, int current_context) { int i; for (i = 0; i < current_context; i++) { SgenThreadPoolContext *context = &pool_contexts [i]; void *thread_data; if (worker_index >= context->num_threads) continue; thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; if (!should_work (context, thread_data)) continue; if (context->job_queue.next_slot > 0) return TRUE; if (continue_idle_job (context, thread_data)) return TRUE; } /* Return if job enqueued on current context. Jobs have priority over idle work */ if (pool_contexts [current_context].job_queue.next_slot > 0) return TRUE; return FALSE; } /* * Gets the highest priority work. If there is none, it waits * for work_cond. Should always be called with lock held. */ static void get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job) { while (!threadpool_shutdown) { int i; for (i = 0; i < contexts_num; i++) { SgenThreadPoolContext *context = &pool_contexts [i]; void *thread_data; if (worker_index >= context->num_threads) continue; thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; if (!should_work (context, thread_data)) continue; /* * It's important that we check the continue idle flag with the lock held. * Suppose we didn't check with the lock held, and the result is FALSE. The * main thread might then set continue idle and signal us before we can take * the lock, and we'd lose the signal. */ *do_idle = continue_idle_job (context, thread_data); *job = get_job_and_set_in_progress (context); if (*job || *do_idle) { *work_context = i; return; } } /* * Nothing to do on any context * pthread_cond_wait() can return successfully despite the condition * not being signalled, so we have to run this in a loop until we * really have work to do. */ mono_os_cond_wait (&work_cond, &lock); } } static mono_native_thread_return_t thread_func (void *data) { int worker_index = (int)(gsize)data; int current_context; void *thread_data = NULL; sgen_client_thread_register_worker (); for (current_context = 0; current_context < contexts_num; current_context++) { if (worker_index >= pool_contexts [current_context].num_threads || !pool_contexts [current_context].thread_init_func) break; thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL; pool_contexts [current_context].thread_init_func (thread_data); } current_context = 0; mono_os_mutex_lock (&lock); for (;;) { gboolean do_idle = FALSE; SgenThreadPoolJob *job = NULL; SgenThreadPoolContext *context = NULL; threads_context [worker_index] = -1; get_work (worker_index, ¤t_context, &do_idle, &job); threads_context [worker_index] = current_context; if (!threadpool_shutdown) { context = &pool_contexts [current_context]; thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; } mono_os_mutex_unlock (&lock); if (job) { job->func (thread_data, job); mono_os_mutex_lock (&lock); SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); job->state = STATE_DONE; remove_job (context, job); /* * Only the main GC thread will ever wait on the done condition, so we don't * have to broadcast. */ mono_os_cond_signal (&done_cond); } else if (do_idle) { SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?"); do { context->idle_job_func (thread_data); do_idle = continue_idle_job (context, thread_data); } while (do_idle && !has_priority_work (worker_index, current_context)); mono_os_mutex_lock (&lock); if (!do_idle) mono_os_cond_signal (&done_cond); } else { SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); mono_os_mutex_lock (&lock); threads_finished++; mono_os_cond_signal (&done_cond); mono_os_mutex_unlock (&lock); return 0; } } return (mono_native_thread_return_t)0; } int sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas) { int context_id = contexts_num; SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached"); pool_contexts [context_id].thread_init_func = init_func; pool_contexts [context_id].idle_job_func = idle_func; pool_contexts [context_id].continue_idle_job_func = continue_idle_func; pool_contexts [context_id].should_work_func = should_work_func; pool_contexts [context_id].thread_datas = thread_datas; SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded"); pool_contexts [context_id].num_threads = num_threads; sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0); contexts_num++; return context_id; } void sgen_thread_pool_start (void) { int i; for (i = 0; i < contexts_num; i++) { if (threads_num < pool_contexts [i].num_threads) threads_num = pool_contexts [i].num_threads; } if (!threads_num) return; mono_os_mutex_init (&lock); mono_os_cond_init (&work_cond); mono_os_cond_init (&done_cond); threads_finished = 0; threadpool_shutdown = FALSE; for (i = 0; i < threads_num; i++) { mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i); } } void sgen_thread_pool_shutdown (void) { if (!threads_num) return; mono_os_mutex_lock (&lock); threadpool_shutdown = TRUE; mono_os_cond_broadcast (&work_cond); while (threads_finished < threads_num) mono_os_cond_wait (&done_cond, &lock); mono_os_mutex_unlock (&lock); mono_os_mutex_destroy (&lock); mono_os_cond_destroy (&work_cond); mono_os_cond_destroy (&done_cond); for (int i = 0; i < threads_num; i++) { mono_threads_add_joinable_thread ((gpointer)threads [i]); } } SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size) { SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE); job->name = name; job->size = size; job->state = STATE_WAITING; job->func = func; return job; } void sgen_thread_pool_job_free (SgenThreadPoolJob *job) { sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB); } void sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job) { mono_os_mutex_lock (&lock); sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job); mono_os_cond_broadcast (&work_cond); mono_os_mutex_unlock (&lock); } void sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job) { SGEN_ASSERT (0, job, "Where's the job?"); mono_os_mutex_lock (&lock); while (find_job_in_queue (&pool_contexts [context_id], job) >= 0) mono_os_cond_wait (&done_cond, &lock); mono_os_mutex_unlock (&lock); } void sgen_thread_pool_idle_signal (int context_id) { SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?"); mono_os_mutex_lock (&lock); if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id)) mono_os_cond_broadcast (&work_cond); mono_os_mutex_unlock (&lock); } void sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait) { SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?"); mono_os_mutex_lock (&lock); while (continue_wait (context_id, threads_context)) mono_os_cond_wait (&done_cond, &lock); mono_os_mutex_unlock (&lock); } void sgen_thread_pool_wait_for_all_jobs (int context_id) { mono_os_mutex_lock (&lock); while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue)) mono_os_cond_wait (&done_cond, &lock); mono_os_mutex_unlock (&lock); } /* Return 0 if is not a thread pool thread or the thread number otherwise */ int sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) { int i; for (i = 0; i < threads_num; i++) { if (some_thread == threads [i]) return i + 1; } return 0; } #endif