X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fsgen%2Fsgen-thread-pool.c;h=6a03bc3d69d164b5cd949b0b71760fc84f41d04b;hb=dc2e330a9ff2d5c5271693d5b8d685aa8c0dd3b2;hp=a7abfad24472822f85e7f51632aa998cb913f05c;hpb=a9b5b501fd36a4b488e2295196a52778b2b94d1f;p=mono.git diff --git a/mono/sgen/sgen-thread-pool.c b/mono/sgen/sgen-thread-pool.c index a7abfad2447..6a03bc3d69d 100644 --- a/mono/sgen/sgen-thread-pool.c +++ b/mono/sgen/sgen-thread-pool.c @@ -12,8 +12,22 @@ #include "mono/sgen/sgen-gc.h" #include "mono/sgen/sgen-thread-pool.h" +#include "mono/sgen/sgen-client.h" #include "mono/utils/mono-os-mutex.h" +static mono_mutex_t lock; +static mono_cond_t work_cond; +static mono_cond_t done_cond; + +static int threads_num; +static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS]; + +static volatile gboolean threadpool_shutdown; +static volatile int threads_finished; + +static int contexts_num; +static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS]; + enum { STATE_WAITING, STATE_IN_PROGRESS, @@ -22,10 +36,10 @@ enum { /* Assumes that the lock is held. */ static SgenThreadPoolJob* -get_job_and_set_in_progress (SgenThreadPool *pool) +get_job_and_set_in_progress (SgenThreadPoolContext *context) { - for (size_t i = 0; i < pool->job_queue.next_slot; ++i) { - SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i]; + for (size_t i = 0; i < context->job_queue.next_slot; ++i) { + SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i]; if (job->state == STATE_WAITING) { job->state = STATE_IN_PROGRESS; return job; @@ -36,10 +50,10 @@ get_job_and_set_in_progress (SgenThreadPool *pool) /* Assumes that the lock is held. */ static ssize_t -find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job) +find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job) { - for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) { - if (pool->job_queue.data [i] == job) + for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) { + if (context->job_queue.data [i] == job) return i; } return -1; @@ -47,100 +61,174 @@ find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job) /* Assumes that the lock is held. */ static void -remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job) +remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job) { ssize_t index; SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?"); - index = find_job_in_queue (pool, job); + index = find_job_in_queue (context, job); SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?"); - pool->job_queue.data [index] = NULL; - sgen_pointer_queue_remove_nulls (&pool->job_queue); + context->job_queue.data [index] = NULL; + sgen_pointer_queue_remove_nulls (&context->job_queue); sgen_thread_pool_job_free (job); } static gboolean -continue_idle_job (SgenThreadPool *pool, void *thread_data) +continue_idle_job (SgenThreadPoolContext *context, void *thread_data) { - if (!pool->continue_idle_job_func) + if (!context->continue_idle_job_func) return FALSE; - return pool->continue_idle_job_func (thread_data); + return context->continue_idle_job_func (thread_data, context - pool_contexts); } static gboolean -should_work (SgenThreadPool *pool, void *thread_data) +should_work (SgenThreadPoolContext *context, void *thread_data) { - if (!pool->should_work_func) + if (!context->should_work_func) return TRUE; - return pool->should_work_func (thread_data); + return context->should_work_func (thread_data); } -static mono_native_thread_return_t -thread_func (SgenThreadPoolData *thread_data) +/* + * Tells whether we should lock and attempt to get work from + * a higher priority context. + */ +static gboolean +has_priority_work (int worker_index, int current_context) { - SgenThreadPool *pool = thread_data->pool; - - pool->thread_init_func (thread_data); + int i; - mono_os_mutex_lock (&pool->lock); - for (;;) { - gboolean do_idle; - SgenThreadPoolJob *job; + for (i = 0; i < current_context; i++) { + SgenThreadPoolContext *context = &pool_contexts [i]; + void *thread_data; - if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) { - mono_os_cond_wait (&pool->work_cond, &pool->lock); + if (worker_index >= context->num_threads) continue; + thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; + if (!should_work (context, thread_data)) + continue; + if (context->job_queue.next_slot > 0) + return TRUE; + if (continue_idle_job (context, thread_data)) + return TRUE; + } + + /* Return if job enqueued on current context. Jobs have priority over idle work */ + if (pool_contexts [current_context].job_queue.next_slot > 0) + return TRUE; + + return FALSE; +} + +/* + * Gets the highest priority work. If there is none, it waits + * for work_cond. Should always be called with lock held. + */ +static void +get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job) +{ + while (!threadpool_shutdown) { + int i; + + for (i = 0; i < contexts_num; i++) { + SgenThreadPoolContext *context = &pool_contexts [i]; + void *thread_data; + + if (worker_index >= context->num_threads) + continue; + thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; + + if (!should_work (context, thread_data)) + continue; + + /* + * It's important that we check the continue idle flag with the lock held. + * Suppose we didn't check with the lock held, and the result is FALSE. The + * main thread might then set continue idle and signal us before we can take + * the lock, and we'd lose the signal. + */ + *do_idle = continue_idle_job (context, thread_data); + *job = get_job_and_set_in_progress (context); + + if (*job || *do_idle) { + *work_context = i; + return; + } } + /* - * It's important that we check the continue idle flag with the lock held. - * Suppose we didn't check with the lock held, and the result is FALSE. The - * main thread might then set continue idle and signal us before we can take - * the lock, and we'd lose the signal. + * Nothing to do on any context + * pthread_cond_wait() can return successfully despite the condition + * not being signalled, so we have to run this in a loop until we + * really have work to do. */ - do_idle = continue_idle_job (pool, thread_data); - job = get_job_and_set_in_progress (pool); + mono_os_cond_wait (&work_cond, &lock); + } +} - if (!job && !do_idle && !pool->threadpool_shutdown) { - /* - * pthread_cond_wait() can return successfully despite the condition - * not being signalled, so we have to run this in a loop until we - * really have work to do. - */ - mono_os_cond_wait (&pool->work_cond, &pool->lock); - continue; +static mono_native_thread_return_t +thread_func (int worker_index) +{ + int current_context; + void *thread_data = NULL; + + sgen_client_thread_register_worker (); + + for (current_context = 0; current_context < contexts_num; current_context++) { + if (worker_index >= pool_contexts [current_context].num_threads || + !pool_contexts [current_context].thread_init_func) + break; + + thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL; + pool_contexts [current_context].thread_init_func (thread_data); + } + + current_context = 0; + + mono_os_mutex_lock (&lock); + for (;;) { + gboolean do_idle = FALSE; + SgenThreadPoolJob *job = NULL; + SgenThreadPoolContext *context = NULL; + + get_work (worker_index, ¤t_context, &do_idle, &job); + + if (!threadpool_shutdown) { + context = &pool_contexts [current_context]; + thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL; } - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); if (job) { job->func (thread_data, job); - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); job->state = STATE_DONE; - remove_job (pool, job); + remove_job (context, job); /* * Only the main GC thread will ever wait on the done condition, so we don't * have to broadcast. */ - mono_os_cond_signal (&pool->done_cond); + mono_os_cond_signal (&done_cond); } else if (do_idle) { - SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?"); + SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?"); do { - pool->idle_job_func (thread_data); - do_idle = continue_idle_job (pool, thread_data); - } while (do_idle && !pool->job_queue.next_slot); + context->idle_job_func (thread_data); + do_idle = continue_idle_job (context, thread_data); + } while (do_idle && !has_priority_work (worker_index, current_context)); - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); if (!do_idle) - mono_os_cond_signal (&pool->done_cond); + mono_os_cond_signal (&done_cond); } else { - SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); - mono_os_mutex_lock (&pool->lock); - pool->threads_finished++; - mono_os_cond_signal (&pool->done_cond); - mono_os_mutex_unlock (&pool->lock); + SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); + mono_os_mutex_lock (&lock); + threads_finished++; + mono_os_cond_signal (&done_cond); + mono_os_mutex_unlock (&lock); return 0; } } @@ -148,50 +236,71 @@ thread_func (SgenThreadPoolData *thread_data) return (mono_native_thread_return_t)0; } +int +sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas) +{ + int context_id = contexts_num; + + SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached"); + + pool_contexts [context_id].thread_init_func = init_func; + pool_contexts [context_id].idle_job_func = idle_func; + pool_contexts [context_id].continue_idle_job_func = continue_idle_func; + pool_contexts [context_id].should_work_func = should_work_func; + pool_contexts [context_id].thread_datas = thread_datas; + + SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded"); + + pool_contexts [context_id].num_threads = num_threads; + + sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0); + + contexts_num++; + + return context_id; +} + void -sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas) +sgen_thread_pool_start (void) { int i; - SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?"); - - pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS; + for (i = 0; i < contexts_num; i++) { + if (threads_num < pool_contexts [i].num_threads) + threads_num = pool_contexts [i].num_threads; + } - mono_os_mutex_init (&pool->lock); - mono_os_cond_init (&pool->work_cond); - mono_os_cond_init (&pool->done_cond); + if (!threads_num) + return; - pool->thread_init_func = init_func; - pool->idle_job_func = idle_func; - pool->continue_idle_job_func = continue_idle_func; - pool->should_work_func = should_work_func_p; + mono_os_mutex_init (&lock); + mono_os_cond_init (&work_cond); + mono_os_cond_init (&done_cond); - sgen_pointer_queue_init (&pool->job_queue, 0); - pool->threads_finished = 0; - pool->threadpool_shutdown = FALSE; + threads_finished = 0; + threadpool_shutdown = FALSE; - for (i = 0; i < pool->threads_num; i++) { - thread_datas [i]->pool = pool; - mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]); + for (i = 0; i < threads_num; i++) { + mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i); } } void -sgen_thread_pool_shutdown (SgenThreadPool *pool) +sgen_thread_pool_shutdown (void) { - if (!pool) + if (!threads_num) return; - mono_os_mutex_lock (&pool->lock); - pool->threadpool_shutdown = TRUE; - mono_os_cond_broadcast (&pool->work_cond); - while (pool->threads_finished < pool->threads_num) - mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_lock (&lock); + threadpool_shutdown = TRUE; + mono_os_cond_broadcast (&work_cond); + while (threads_finished < threads_num) + mono_os_cond_wait (&done_cond, &lock); + mono_os_mutex_unlock (&lock); - mono_os_mutex_destroy (&pool->lock); - mono_os_cond_destroy (&pool->work_cond); - mono_os_cond_destroy (&pool->done_cond); + mono_os_mutex_destroy (&lock); + mono_os_cond_destroy (&work_cond); + mono_os_cond_destroy (&done_cond); } SgenThreadPoolJob* @@ -212,77 +321,74 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job) } void -sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job) +sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job) { - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); - sgen_pointer_queue_add (&pool->job_queue, job); - mono_os_cond_signal (&pool->work_cond); + sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job); + mono_os_cond_broadcast (&work_cond); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); } void -sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job) +sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job) { SGEN_ASSERT (0, job, "Where's the job?"); - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); - while (find_job_in_queue (pool, job) >= 0) - mono_os_cond_wait (&pool->done_cond, &pool->lock); + while (find_job_in_queue (&pool_contexts [context_id], job) >= 0) + mono_os_cond_wait (&done_cond, &lock); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); } void -sgen_thread_pool_idle_signal (SgenThreadPool *pool) +sgen_thread_pool_idle_signal (int context_id) { - SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?"); + SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?"); - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); - if (pool->continue_idle_job_func (NULL)) - mono_os_cond_broadcast (&pool->work_cond); + if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id)) + mono_os_cond_broadcast (&work_cond); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); } void -sgen_thread_pool_idle_wait (SgenThreadPool *pool) +sgen_thread_pool_idle_wait (int context_id) { - SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?"); + SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?"); - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); - while (pool->continue_idle_job_func (NULL)) - mono_os_cond_wait (&pool->done_cond, &pool->lock); + while (pool_contexts [context_id].continue_idle_job_func (NULL, context_id)) + mono_os_cond_wait (&done_cond, &lock); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); } void -sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool) +sgen_thread_pool_wait_for_all_jobs (int context_id) { - mono_os_mutex_lock (&pool->lock); + mono_os_mutex_lock (&lock); - while (!sgen_pointer_queue_is_empty (&pool->job_queue)) - mono_os_cond_wait (&pool->done_cond, &pool->lock); + while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue)) + mono_os_cond_wait (&done_cond, &lock); - mono_os_mutex_unlock (&pool->lock); + mono_os_mutex_unlock (&lock); } /* Return 0 if is not a thread pool thread or the thread number otherwise */ int -sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread) +sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) { int i; - if (!pool) - return 0; - - for (i = 0; i < pool->threads_num; i++) { - if (some_thread == pool->threads [i]) + for (i = 0; i < threads_num; i++) { + if (some_thread == threads [i]) return i + 1; }