X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fsgen%2Fsgen-thread-pool.c;h=a7abfad24472822f85e7f51632aa998cb913f05c;hb=393db66fa0a3f21b51e030f68107f8838a36fc98;hp=20c5e17d3d7464fc90ac156b0efdaeb3d45b8b6a;hpb=3ab1a56f73cac926676e24b58d07bf0c4e75d3eb;p=mono.git diff --git a/mono/sgen/sgen-thread-pool.c b/mono/sgen/sgen-thread-pool.c index 20c5e17d3d7..a7abfad2447 100644 --- a/mono/sgen/sgen-thread-pool.c +++ b/mono/sgen/sgen-thread-pool.c @@ -1,5 +1,6 @@ -/* - * sgen-thread-pool.c: Threadpool for all concurrent GC work. +/** + * \file + * Threadpool for all concurrent GC work. * * Copyright (C) 2015 Xamarin Inc * @@ -11,31 +12,7 @@ #include "mono/sgen/sgen-gc.h" #include "mono/sgen/sgen-thread-pool.h" -#include "mono/sgen/sgen-pointer-queue.h" #include "mono/utils/mono-os-mutex.h" -#ifndef SGEN_WITHOUT_MONO -#include "mono/utils/mono-threads.h" -#endif - -#define MAX_NUM_THREADS 8 - -static mono_mutex_t lock; -static mono_cond_t work_cond; -static mono_cond_t done_cond; - -static int threads_num = 0; -static MonoNativeThreadId threads [MAX_NUM_THREADS]; - -/* Only accessed with the lock held. */ -static SgenPointerQueue job_queue; - -static SgenThreadPoolThreadInitFunc thread_init_func; -static SgenThreadPoolIdleJobFunc idle_job_func; -static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func; -static SgenThreadPoolShouldWorkFunc should_work_func; - -static volatile gboolean threadpool_shutdown; -static volatile int threads_finished = 0; enum { STATE_WAITING, @@ -45,10 +22,10 @@ enum { /* Assumes that the lock is held. */ static SgenThreadPoolJob* -get_job_and_set_in_progress (void) +get_job_and_set_in_progress (SgenThreadPool *pool) { - for (size_t i = 0; i < job_queue.next_slot; ++i) { - SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i]; + for (size_t i = 0; i < pool->job_queue.next_slot; ++i) { + SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i]; if (job->state == STATE_WAITING) { job->state = STATE_IN_PROGRESS; return job; @@ -59,10 +36,10 @@ get_job_and_set_in_progress (void) /* Assumes that the lock is held. */ static ssize_t -find_job_in_queue (SgenThreadPoolJob *job) +find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job) { - for (ssize_t i = 0; i < job_queue.next_slot; ++i) { - if (job_queue.data [i] == job) + for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) { + if (pool->job_queue.data [i] == job) return i; } return -1; @@ -70,45 +47,47 @@ find_job_in_queue (SgenThreadPoolJob *job) /* Assumes that the lock is held. */ static void -remove_job (SgenThreadPoolJob *job) +remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job) { ssize_t index; SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?"); - index = find_job_in_queue (job); + index = find_job_in_queue (pool, job); SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?"); - job_queue.data [index] = NULL; - sgen_pointer_queue_remove_nulls (&job_queue); + pool->job_queue.data [index] = NULL; + sgen_pointer_queue_remove_nulls (&pool->job_queue); sgen_thread_pool_job_free (job); } static gboolean -continue_idle_job (void *thread_data) +continue_idle_job (SgenThreadPool *pool, void *thread_data) { - if (!continue_idle_job_func) + if (!pool->continue_idle_job_func) return FALSE; - return continue_idle_job_func (thread_data); + return pool->continue_idle_job_func (thread_data); } static gboolean -should_work (void *thread_data) +should_work (SgenThreadPool *pool, void *thread_data) { - if (!should_work_func) + if (!pool->should_work_func) return TRUE; - return should_work_func (thread_data); + return pool->should_work_func (thread_data); } static mono_native_thread_return_t -thread_func (void *thread_data) +thread_func (SgenThreadPoolData *thread_data) { - thread_init_func (thread_data); + SgenThreadPool *pool = thread_data->pool; + + pool->thread_init_func (thread_data); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); for (;;) { gboolean do_idle; SgenThreadPoolJob *job; - if (!should_work (thread_data)) { - mono_os_cond_wait (&work_cond, &lock); + if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) { + mono_os_cond_wait (&pool->work_cond, &pool->lock); continue; } /* @@ -117,51 +96,51 @@ thread_func (void *thread_data) * main thread might then set continue idle and signal us before we can take * the lock, and we'd lose the signal. */ - do_idle = continue_idle_job (thread_data); - job = get_job_and_set_in_progress (); + do_idle = continue_idle_job (pool, thread_data); + job = get_job_and_set_in_progress (pool); - if (!job && !do_idle && !threadpool_shutdown) { + if (!job && !do_idle && !pool->threadpool_shutdown) { /* * pthread_cond_wait() can return successfully despite the condition * not being signalled, so we have to run this in a loop until we * really have work to do. */ - mono_os_cond_wait (&work_cond, &lock); + mono_os_cond_wait (&pool->work_cond, &pool->lock); continue; } - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); if (job) { job->func (thread_data, job); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); job->state = STATE_DONE; - remove_job (job); + remove_job (pool, job); /* * Only the main GC thread will ever wait on the done condition, so we don't * have to broadcast. */ - mono_os_cond_signal (&done_cond); + mono_os_cond_signal (&pool->done_cond); } else if (do_idle) { - SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?"); do { - idle_job_func (thread_data); - do_idle = continue_idle_job (thread_data); - } while (do_idle && !job_queue.next_slot); + pool->idle_job_func (thread_data); + do_idle = continue_idle_job (pool, thread_data); + } while (do_idle && !pool->job_queue.next_slot); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); if (!do_idle) - mono_os_cond_signal (&done_cond); + mono_os_cond_signal (&pool->done_cond); } else { - SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); - mono_os_mutex_lock (&lock); - threads_finished++; - mono_os_cond_signal (&done_cond); - mono_os_mutex_unlock (&lock); + SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); + mono_os_mutex_lock (&pool->lock); + pool->threads_finished++; + mono_os_cond_signal (&pool->done_cond); + mono_os_mutex_unlock (&pool->lock); return 0; } } @@ -170,41 +149,49 @@ thread_func (void *thread_data) } void -sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas) +sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas) { int i; - threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS; + SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?"); + + pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS; - mono_os_mutex_init (&lock); - mono_os_cond_init (&work_cond); - mono_os_cond_init (&done_cond); + mono_os_mutex_init (&pool->lock); + mono_os_cond_init (&pool->work_cond); + mono_os_cond_init (&pool->done_cond); - thread_init_func = init_func; - idle_job_func = idle_func; - continue_idle_job_func = continue_idle_func; - should_work_func = should_work_func_p; + pool->thread_init_func = init_func; + pool->idle_job_func = idle_func; + pool->continue_idle_job_func = continue_idle_func; + pool->should_work_func = should_work_func_p; - for (i = 0; i < threads_num; i++) - mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL); + sgen_pointer_queue_init (&pool->job_queue, 0); + pool->threads_finished = 0; + pool->threadpool_shutdown = FALSE; + + for (i = 0; i < pool->threads_num; i++) { + thread_datas [i]->pool = pool; + mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]); + } } void -sgen_thread_pool_shutdown (void) +sgen_thread_pool_shutdown (SgenThreadPool *pool) { - if (!threads_num) + if (!pool) return; - mono_os_mutex_lock (&lock); - threadpool_shutdown = TRUE; - mono_os_cond_broadcast (&work_cond); - while (threads_finished < threads_num) - mono_os_cond_wait (&done_cond, &lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_lock (&pool->lock); + pool->threadpool_shutdown = TRUE; + mono_os_cond_broadcast (&pool->work_cond); + while (pool->threads_finished < pool->threads_num) + mono_os_cond_wait (&pool->done_cond, &pool->lock); + mono_os_mutex_unlock (&pool->lock); - mono_os_mutex_destroy (&lock); - mono_os_cond_destroy (&work_cond); - mono_os_cond_destroy (&done_cond); + mono_os_mutex_destroy (&pool->lock); + mono_os_cond_destroy (&pool->work_cond); + mono_os_cond_destroy (&pool->done_cond); } SgenThreadPoolJob* @@ -225,74 +212,77 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job) } void -sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job) +sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job) { - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - sgen_pointer_queue_add (&job_queue, job); - mono_os_cond_signal (&work_cond); + sgen_pointer_queue_add (&pool->job_queue, job); + mono_os_cond_signal (&pool->work_cond); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_job_wait (SgenThreadPoolJob *job) +sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job) { SGEN_ASSERT (0, job, "Where's the job?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (find_job_in_queue (job) >= 0) - mono_os_cond_wait (&done_cond, &lock); + while (find_job_in_queue (pool, job) >= 0) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_idle_signal (void) +sgen_thread_pool_idle_signal (SgenThreadPool *pool) { - SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - if (continue_idle_job_func (NULL)) - mono_os_cond_broadcast (&work_cond); + if (pool->continue_idle_job_func (NULL)) + mono_os_cond_broadcast (&pool->work_cond); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_idle_wait (void) +sgen_thread_pool_idle_wait (SgenThreadPool *pool) { - SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (continue_idle_job_func (NULL)) - mono_os_cond_wait (&done_cond, &lock); + while (pool->continue_idle_job_func (NULL)) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_wait_for_all_jobs (void) +sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool) { - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (!sgen_pointer_queue_is_empty (&job_queue)) - mono_os_cond_wait (&done_cond, &lock); + while (!sgen_pointer_queue_is_empty (&pool->job_queue)) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } /* Return 0 if is not a thread pool thread or the thread number otherwise */ int -sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) +sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread) { int i; - for (i = 0; i < threads_num; i++) { - if (some_thread == threads [i]) + if (!pool) + return 0; + + for (i = 0; i < pool->threads_num; i++) { + if (some_thread == pool->threads [i]) return i + 1; }