-/*
- * sgen-thread-pool.c: Threadpool for all concurrent GC work.
+/**
+ * \file
+ * Threadpool for all concurrent GC work.
*
* Copyright (C) 2015 Xamarin Inc
*
#include "mono/sgen/sgen-gc.h"
#include "mono/sgen/sgen-thread-pool.h"
-#include "mono/sgen/sgen-pointer-queue.h"
#include "mono/utils/mono-os-mutex.h"
-#ifndef SGEN_WITHOUT_MONO
-#include "mono/utils/mono-threads.h"
-#endif
-
-static mono_mutex_t lock;
-static mono_cond_t work_cond;
-static mono_cond_t done_cond;
-
-static MonoNativeThreadId thread;
-
-/* Only accessed with the lock held. */
-static SgenPointerQueue job_queue;
-
-static SgenThreadPoolThreadInitFunc thread_init_func;
-static SgenThreadPoolIdleJobFunc idle_job_func;
-static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
-
-static volatile gboolean threadpool_shutdown;
-static volatile gboolean thread_finished;
enum {
STATE_WAITING,
/* Assumes that the lock is held. */
static SgenThreadPoolJob*
-get_job_and_set_in_progress (void)
+get_job_and_set_in_progress (SgenThreadPool *pool)
{
- for (size_t i = 0; i < job_queue.next_slot; ++i) {
- SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
+ for (size_t i = 0; i < pool->job_queue.next_slot; ++i) {
+ SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i];
if (job->state == STATE_WAITING) {
job->state = STATE_IN_PROGRESS;
return job;
/* Assumes that the lock is held. */
static ssize_t
-find_job_in_queue (SgenThreadPoolJob *job)
+find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
{
- for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
- if (job_queue.data [i] == job)
+ for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) {
+ if (pool->job_queue.data [i] == job)
return i;
}
return -1;
/* Assumes that the lock is held. */
static void
-remove_job (SgenThreadPoolJob *job)
+remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job)
{
ssize_t index;
SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
- index = find_job_in_queue (job);
+ index = find_job_in_queue (pool, job);
SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
- job_queue.data [index] = NULL;
- sgen_pointer_queue_remove_nulls (&job_queue);
+ pool->job_queue.data [index] = NULL;
+ sgen_pointer_queue_remove_nulls (&pool->job_queue);
sgen_thread_pool_job_free (job);
}
static gboolean
-continue_idle_job (void)
+continue_idle_job (SgenThreadPool *pool, void *thread_data)
{
- if (!continue_idle_job_func)
+ if (!pool->continue_idle_job_func)
return FALSE;
- return continue_idle_job_func ();
+ return pool->continue_idle_job_func (thread_data);
+}
+
+static gboolean
+should_work (SgenThreadPool *pool, void *thread_data)
+{
+ if (!pool->should_work_func)
+ return TRUE;
+ return pool->should_work_func (thread_data);
}
static mono_native_thread_return_t
-thread_func (void *thread_data)
+thread_func (SgenThreadPoolData *thread_data)
{
- thread_init_func (thread_data);
+ SgenThreadPool *pool = thread_data->pool;
+
+ pool->thread_init_func (thread_data);
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
for (;;) {
+ gboolean do_idle;
+ SgenThreadPoolJob *job;
+
+ if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) {
+ mono_os_cond_wait (&pool->work_cond, &pool->lock);
+ continue;
+ }
/*
* It's important that we check the continue idle flag with the lock held.
* Suppose we didn't check with the lock held, and the result is FALSE. The
* main thread might then set continue idle and signal us before we can take
* the lock, and we'd lose the signal.
*/
- gboolean do_idle = continue_idle_job ();
- SgenThreadPoolJob *job = get_job_and_set_in_progress ();
+ do_idle = continue_idle_job (pool, thread_data);
+ job = get_job_and_set_in_progress (pool);
- if (!job && !do_idle && !threadpool_shutdown) {
+ if (!job && !do_idle && !pool->threadpool_shutdown) {
/*
* pthread_cond_wait() can return successfully despite the condition
* not being signalled, so we have to run this in a loop until we
* really have work to do.
*/
- mono_os_cond_wait (&work_cond, &lock);
+ mono_os_cond_wait (&pool->work_cond, &pool->lock);
continue;
}
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
if (job) {
job->func (thread_data, job);
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
job->state = STATE_DONE;
- remove_job (job);
+ remove_job (pool, job);
/*
* Only the main GC thread will ever wait on the done condition, so we don't
* have to broadcast.
*/
- mono_os_cond_signal (&done_cond);
+ mono_os_cond_signal (&pool->done_cond);
} else if (do_idle) {
- SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
+ SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?");
do {
- idle_job_func (thread_data);
- do_idle = continue_idle_job ();
- } while (do_idle && !job_queue.next_slot);
+ pool->idle_job_func (thread_data);
+ do_idle = continue_idle_job (pool, thread_data);
+ } while (do_idle && !pool->job_queue.next_slot);
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
if (!do_idle)
- mono_os_cond_signal (&done_cond);
+ mono_os_cond_signal (&pool->done_cond);
} else {
- SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
- mono_os_mutex_lock (&lock);
- thread_finished = TRUE;
- mono_os_cond_signal (&done_cond);
- mono_os_mutex_unlock (&lock);
+ SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
+ mono_os_mutex_lock (&pool->lock);
+ pool->threads_finished++;
+ mono_os_cond_signal (&pool->done_cond);
+ mono_os_mutex_unlock (&pool->lock);
return 0;
}
}
}
void
-sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas)
+sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas)
{
- SGEN_ASSERT (0, num_threads == 1, "We only support 1 thread pool thread for now.");
+ int i;
+
+ SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?");
+
+ pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
+
+ mono_os_mutex_init (&pool->lock);
+ mono_os_cond_init (&pool->work_cond);
+ mono_os_cond_init (&pool->done_cond);
- mono_os_mutex_init (&lock);
- mono_os_cond_init (&work_cond);
- mono_os_cond_init (&done_cond);
+ pool->thread_init_func = init_func;
+ pool->idle_job_func = idle_func;
+ pool->continue_idle_job_func = continue_idle_func;
+ pool->should_work_func = should_work_func_p;
- thread_init_func = init_func;
- idle_job_func = idle_func;
- continue_idle_job_func = continue_idle_func;
+ sgen_pointer_queue_init (&pool->job_queue, 0);
+ pool->threads_finished = 0;
+ pool->threadpool_shutdown = FALSE;
- mono_native_thread_create (&thread, thread_func, thread_datas ? thread_datas [0] : NULL);
+ for (i = 0; i < pool->threads_num; i++) {
+ thread_datas [i]->pool = pool;
+ mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]);
+ }
}
void
-sgen_thread_pool_shutdown (void)
+sgen_thread_pool_shutdown (SgenThreadPool *pool)
{
- if (!thread)
+ if (!pool)
return;
- mono_os_mutex_lock (&lock);
- threadpool_shutdown = TRUE;
- mono_os_cond_signal (&work_cond);
- while (!thread_finished)
- mono_os_cond_wait (&done_cond, &lock);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_lock (&pool->lock);
+ pool->threadpool_shutdown = TRUE;
+ mono_os_cond_broadcast (&pool->work_cond);
+ while (pool->threads_finished < pool->threads_num)
+ mono_os_cond_wait (&pool->done_cond, &pool->lock);
+ mono_os_mutex_unlock (&pool->lock);
- mono_os_mutex_destroy (&lock);
- mono_os_cond_destroy (&work_cond);
- mono_os_cond_destroy (&done_cond);
+ mono_os_mutex_destroy (&pool->lock);
+ mono_os_cond_destroy (&pool->work_cond);
+ mono_os_cond_destroy (&pool->done_cond);
}
SgenThreadPoolJob*
}
void
-sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
+sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job)
{
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
- sgen_pointer_queue_add (&job_queue, job);
- /*
- * FIXME: We could check whether there is a job in progress. If there is, there's
- * no need to signal the condition, at least as long as we have only one thread.
- */
- mono_os_cond_signal (&work_cond);
+ sgen_pointer_queue_add (&pool->job_queue, job);
+ mono_os_cond_signal (&pool->work_cond);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
}
void
-sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
+sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job)
{
SGEN_ASSERT (0, job, "Where's the job?");
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
- while (find_job_in_queue (job) >= 0)
- mono_os_cond_wait (&done_cond, &lock);
+ while (find_job_in_queue (pool, job) >= 0)
+ mono_os_cond_wait (&pool->done_cond, &pool->lock);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
}
void
-sgen_thread_pool_idle_signal (void)
+sgen_thread_pool_idle_signal (SgenThreadPool *pool)
{
- SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
+ SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?");
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
- if (continue_idle_job_func ())
- mono_os_cond_signal (&work_cond);
+ if (pool->continue_idle_job_func (NULL))
+ mono_os_cond_broadcast (&pool->work_cond);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
}
void
-sgen_thread_pool_idle_wait (void)
+sgen_thread_pool_idle_wait (SgenThreadPool *pool)
{
- SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
+ SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?");
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
- while (continue_idle_job_func ())
- mono_os_cond_wait (&done_cond, &lock);
+ while (pool->continue_idle_job_func (NULL))
+ mono_os_cond_wait (&pool->done_cond, &pool->lock);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
}
void
-sgen_thread_pool_wait_for_all_jobs (void)
+sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool)
{
- mono_os_mutex_lock (&lock);
+ mono_os_mutex_lock (&pool->lock);
- while (!sgen_pointer_queue_is_empty (&job_queue))
- mono_os_cond_wait (&done_cond, &lock);
+ while (!sgen_pointer_queue_is_empty (&pool->job_queue))
+ mono_os_cond_wait (&pool->done_cond, &pool->lock);
- mono_os_mutex_unlock (&lock);
+ mono_os_mutex_unlock (&pool->lock);
}
-gboolean
-sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
+/* Return 0 if is not a thread pool thread or the thread number otherwise */
+int
+sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread)
{
- return some_thread == thread;
+ int i;
+
+ if (!pool)
+ return 0;
+
+ for (i = 0; i < pool->threads_num; i++) {
+ if (some_thread == pool->threads [i])
+ return i + 1;
+ }
+
+ return 0;
}
#endif