From b388646ad012fdc0af6206ce1fad230fba5976d3 Mon Sep 17 00:00:00 2001 From: Vlad Brezae Date: Wed, 29 Mar 2017 14:17:02 +0300 Subject: [PATCH] [sgen] Split up concurrent sweep from worker logic While this was ok because workers and sweep could not run at the same time, this is no longer the case with parallel nursery collections. In the future, if many threads will be required for different types of jobs, we can consider running them on same native thread, but the jobs will still need to have appropriate contexts (the SgenThreadPool they belong to). --- mono/metadata/sgen-mono.c | 8 +- mono/metadata/sgen-stw.c | 5 +- mono/sgen/sgen-gc.c | 2 +- mono/sgen/sgen-gc.h | 5 +- mono/sgen/sgen-marksweep.c | 32 ++++- mono/sgen/sgen-memory-governor.c | 4 +- mono/sgen/sgen-protocol.c | 10 +- mono/sgen/sgen-thread-pool.c | 221 +++++++++++++++---------------- mono/sgen/sgen-thread-pool.h | 54 ++++++-- mono/sgen/sgen-workers.c | 44 +++--- mono/sgen/sgen-workers.h | 7 + 11 files changed, 231 insertions(+), 161 deletions(-) diff --git a/mono/metadata/sgen-mono.c b/mono/metadata/sgen-mono.c index f2958e964be..a67fc067263 100644 --- a/mono/metadata/sgen-mono.c +++ b/mono/metadata/sgen-mono.c @@ -17,7 +17,7 @@ #include "sgen/sgen-client.h" #include "sgen/sgen-cardtable.h" #include "sgen/sgen-pinning.h" -#include "sgen/sgen-thread-pool.h" +#include "sgen/sgen-workers.h" #include "metadata/marshal.h" #include "metadata/method-builder.h" #include "metadata/abi-details.h" @@ -2069,7 +2069,7 @@ mono_sgen_register_moved_object (void *obj, void *destination) * lock-free data structure for the queue as multiple threads will be * adding to it at the same time. */ - if (sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ())) { + if (sgen_workers_is_worker_thread (mono_native_thread_id_get ())) { sgen_pointer_queue_add (&moved_objects_queue, obj); sgen_pointer_queue_add (&moved_objects_queue, destination); } else { @@ -3023,7 +3023,9 @@ mono_gc_base_init (void) void mono_gc_base_cleanup (void) { - sgen_thread_pool_shutdown (); + sgen_thread_pool_shutdown (major_collector.get_sweep_pool ()); + + sgen_workers_shutdown (); // We should have consumed any outstanding moves. g_assert (sgen_pointer_queue_is_empty (&moved_objects_queue)); diff --git a/mono/metadata/sgen-stw.c b/mono/metadata/sgen-stw.c index 8c1a234616f..fc1de7c25f8 100644 --- a/mono/metadata/sgen-stw.c +++ b/mono/metadata/sgen-stw.c @@ -20,7 +20,7 @@ #include "sgen/sgen-gc.h" #include "sgen/sgen-protocol.h" #include "sgen/sgen-memory-governor.h" -#include "sgen/sgen-thread-pool.h" +#include "sgen/sgen-workers.h" #include "metadata/profiler-private.h" #include "sgen/sgen-client.h" #include "metadata/sgen-bridge-internals.h" @@ -227,7 +227,8 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info, int *reason) We can't suspend the workers that will do all the heavy lifting. FIXME Use some state bit in SgenThreadInfo for this. */ - if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) { + if (sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), mono_thread_info_get_tid (info)) || + sgen_workers_is_worker_thread (mono_thread_info_get_tid (info))) { if (reason) *reason = 4; return FALSE; diff --git a/mono/sgen/sgen-gc.c b/mono/sgen/sgen-gc.c index ef8739400fc..91ee1a57a94 100644 --- a/mono/sgen/sgen-gc.c +++ b/mono/sgen/sgen-gc.c @@ -3408,7 +3408,7 @@ sgen_gc_init (void) if (major_collector.post_param_init) major_collector.post_param_init (&major_collector); - if (major_collector.needs_thread_pool || sgen_minor_collector.is_parallel) { + if (major_collector.is_concurrent || sgen_minor_collector.is_parallel) { int num_workers = 1; if (major_collector.is_parallel || sgen_minor_collector.is_parallel) { /* FIXME Detect the number of physical cores, instead of logical */ diff --git a/mono/sgen/sgen-gc.h b/mono/sgen/sgen-gc.h index 4ff60166cca..3ebe2e4dfad 100644 --- a/mono/sgen/sgen-gc.h +++ b/mono/sgen/sgen-gc.h @@ -35,6 +35,7 @@ typedef struct _SgenThreadInfo SgenThreadInfo; #include "mono/sgen/sgen-hash-table.h" #include "mono/sgen/sgen-protocol.h" #include "mono/sgen/gc-internal-agnostic.h" +#include "mono/sgen/sgen-thread-pool.h" /* The method used to clear the nursery */ /* Clearing at nursery collections is the safest, but has bad interactions with caches. @@ -593,7 +594,7 @@ sgen_update_reference (GCObject **p, GCObject *o, gboolean allow_null) { if (!allow_null) SGEN_ASSERT (0, o, "Cannot update a reference with a NULL pointer"); - SGEN_ASSERT (0, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread"); + SGEN_ASSERT (0, !sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread"); *p = o; } @@ -636,7 +637,6 @@ struct _SgenMajorCollector { size_t section_size; gboolean is_concurrent; gboolean is_parallel; - gboolean needs_thread_pool; gboolean supports_cardtable; gboolean sweeps_lazily; @@ -693,6 +693,7 @@ struct _SgenMajorCollector { guint8* (*get_cardtable_mod_union_for_reference) (char *object); long long (*get_and_reset_num_major_objects_marked) (void); void (*count_cards) (long long *num_total_cards, long long *num_marked_cards); + SgenThreadPool* (*get_sweep_pool) (void); void (*worker_init_cb) (gpointer worker); }; diff --git a/mono/sgen/sgen-marksweep.c b/mono/sgen/sgen-marksweep.c index 365f6478781..04ab32dab84 100644 --- a/mono/sgen/sgen-marksweep.c +++ b/mono/sgen/sgen-marksweep.c @@ -187,6 +187,9 @@ static volatile int sweep_state = SWEEP_STATE_SWEPT; static gboolean concurrent_mark; static gboolean concurrent_sweep = TRUE; +SgenThreadPool sweep_pool_inst; +SgenThreadPool *sweep_pool; + #define BLOCK_IS_TAGGED_HAS_REFERENCES(bl) SGEN_POINTER_IS_TAGGED_1 ((bl)) #define BLOCK_TAG_HAS_REFERENCES(bl) SGEN_POINTER_TAG_1 ((bl)) @@ -918,7 +921,7 @@ major_finish_sweep_checking (void) wait: job = sweep_job; if (job) - sgen_thread_pool_job_wait (job); + sgen_thread_pool_job_wait (sweep_pool, job); SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?"); SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?"); } @@ -1806,7 +1809,7 @@ sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job) */ if (concurrent_sweep && lazy_sweep) { sweep_blocks_job = sgen_thread_pool_job_alloc ("sweep_blocks", sweep_blocks_job_func, sizeof (SgenThreadPoolJob)); - sgen_thread_pool_job_enqueue (sweep_blocks_job); + sgen_thread_pool_job_enqueue (sweep_pool, sweep_blocks_job); } sweep_finish (); @@ -1855,7 +1858,7 @@ major_sweep (void) SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?"); if (concurrent_sweep) { sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob)); - sgen_thread_pool_job_enqueue (sweep_job); + sgen_thread_pool_job_enqueue (sweep_pool, sweep_job); } else { sweep_job_func (NULL, NULL); } @@ -2068,7 +2071,7 @@ major_start_major_collection (void) */ SgenThreadPoolJob *job = sweep_blocks_job; if (job) - sgen_thread_pool_job_wait (job); + sgen_thread_pool_job_wait (sweep_pool, job); } if (lazy_sweep && !concurrent_sweep) @@ -2108,6 +2111,12 @@ major_finish_major_collection (ScannedObjectCounts *counts) #endif } +static SgenThreadPool* +major_get_sweep_pool (void) +{ + return sweep_pool; +} + static int compare_pointers (const void *va, const void *vb) { char *a = *(char**)va, *b = *(char**)vb; @@ -2714,7 +2723,6 @@ static void post_param_init (SgenMajorCollector *collector) { collector->sweeps_lazily = lazy_sweep; - collector->needs_thread_pool = concurrent_mark || concurrent_sweep; } /* We are guaranteed to be called by the worker in question */ @@ -2733,6 +2741,12 @@ sgen_worker_init_callback (gpointer worker_untyped) mono_native_tls_set_value (worker_block_free_list_key, worker_free_blocks); } +static void +thread_pool_init_func (void *data_untyped) +{ + sgen_client_thread_register_worker (); +} + static void sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurrent, gboolean is_parallel) { @@ -2788,7 +2802,6 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr concurrent_mark = is_concurrent; collector->is_concurrent = is_concurrent; collector->is_parallel = is_parallel; - collector->needs_thread_pool = is_concurrent || concurrent_sweep; collector->get_and_reset_num_major_objects_marked = major_get_and_reset_num_major_objects_marked; collector->supports_cardtable = TRUE; @@ -2834,6 +2847,7 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr collector->is_valid_object = major_is_valid_object; collector->describe_pointer = major_describe_pointer; collector->count_cards = major_count_cards; + collector->get_sweep_pool = major_get_sweep_pool; collector->major_ops_serial.copy_or_mark_object = major_copy_or_mark_object_canonical; collector->major_ops_serial.scan_object = major_scan_object_with_evacuation; @@ -2893,6 +2907,12 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr /*cardtable requires major pages to be 8 cards aligned*/ g_assert ((MS_BLOCK_SIZE % (8 * CARD_SIZE_IN_BYTES)) == 0); + + if (concurrent_sweep) { + SgenThreadPool **thread_datas = &sweep_pool; + sweep_pool = &sweep_pool_inst; + sgen_thread_pool_init (sweep_pool, 1, thread_pool_init_func, NULL, NULL, NULL, (SgenThreadPoolData**)&thread_datas); + } } void diff --git a/mono/sgen/sgen-memory-governor.c b/mono/sgen/sgen-memory-governor.c index 8dae5a062fd..3bf90115b85 100644 --- a/mono/sgen/sgen-memory-governor.c +++ b/mono/sgen/sgen-memory-governor.c @@ -20,7 +20,7 @@ #include "mono/sgen/sgen-gc.h" #include "mono/sgen/sgen-memory-governor.h" -#include "mono/sgen/sgen-thread-pool.h" +#include "mono/sgen/sgen-workers.h" #include "mono/sgen/sgen-client.h" #define MIN_MINOR_COLLECTION_ALLOWANCE ((mword)(DEFAULT_NURSERY_SIZE * default_allowance_nursery_size_ratio)) @@ -459,7 +459,7 @@ gboolean sgen_memgov_try_alloc_space (mword size, int space) { if (sgen_memgov_available_free_space () < size) { - SGEN_ASSERT (4, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread"); + SGEN_ASSERT (4, !sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread"); return FALSE; } diff --git a/mono/sgen/sgen-protocol.c b/mono/sgen/sgen-protocol.c index 419e90a482a..f86b05e51c9 100644 --- a/mono/sgen/sgen-protocol.c +++ b/mono/sgen/sgen-protocol.c @@ -16,7 +16,7 @@ #include "sgen-gc.h" #include "sgen-protocol.h" #include "sgen-memory-governor.h" -#include "sgen-thread-pool.h" +#include "sgen-workers.h" #include "sgen-client.h" #include "mono/utils/mono-membar.h" #include "mono/utils/mono-proclib.h" @@ -365,11 +365,17 @@ protocol_entry (unsigned char type, gpointer data, int size) buffer->buffer [index++] = type; /* We should never change the header format */ if (include_worker_index) { + int worker_index; + MonoNativeThreadId tid = mono_native_thread_id_get (); /* * If the thread is not a worker thread we insert 0, which is interpreted * as gc thread. Worker indexes are 1 based. */ - buffer->buffer [index++] = (unsigned char) sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()); + worker_index = sgen_workers_is_worker_thread (tid); + if (!worker_index) + worker_index = sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), tid); + /* FIXME Consider using different index bases for different thread pools */ + buffer->buffer [index++] = (unsigned char) worker_index; } memcpy (buffer->buffer + index, data, size); index += size; diff --git a/mono/sgen/sgen-thread-pool.c b/mono/sgen/sgen-thread-pool.c index f2aef3ae023..a7abfad2447 100644 --- a/mono/sgen/sgen-thread-pool.c +++ b/mono/sgen/sgen-thread-pool.c @@ -12,31 +12,7 @@ #include "mono/sgen/sgen-gc.h" #include "mono/sgen/sgen-thread-pool.h" -#include "mono/sgen/sgen-pointer-queue.h" #include "mono/utils/mono-os-mutex.h" -#ifndef SGEN_WITHOUT_MONO -#include "mono/utils/mono-threads.h" -#endif - -#define MAX_NUM_THREADS 8 - -static mono_mutex_t lock; -static mono_cond_t work_cond; -static mono_cond_t done_cond; - -static int threads_num = 0; -static MonoNativeThreadId threads [MAX_NUM_THREADS]; - -/* Only accessed with the lock held. */ -static SgenPointerQueue job_queue; - -static SgenThreadPoolThreadInitFunc thread_init_func; -static SgenThreadPoolIdleJobFunc idle_job_func; -static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func; -static SgenThreadPoolShouldWorkFunc should_work_func; - -static volatile gboolean threadpool_shutdown; -static volatile int threads_finished = 0; enum { STATE_WAITING, @@ -46,10 +22,10 @@ enum { /* Assumes that the lock is held. */ static SgenThreadPoolJob* -get_job_and_set_in_progress (void) +get_job_and_set_in_progress (SgenThreadPool *pool) { - for (size_t i = 0; i < job_queue.next_slot; ++i) { - SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i]; + for (size_t i = 0; i < pool->job_queue.next_slot; ++i) { + SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i]; if (job->state == STATE_WAITING) { job->state = STATE_IN_PROGRESS; return job; @@ -60,10 +36,10 @@ get_job_and_set_in_progress (void) /* Assumes that the lock is held. */ static ssize_t -find_job_in_queue (SgenThreadPoolJob *job) +find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job) { - for (ssize_t i = 0; i < job_queue.next_slot; ++i) { - if (job_queue.data [i] == job) + for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) { + if (pool->job_queue.data [i] == job) return i; } return -1; @@ -71,45 +47,47 @@ find_job_in_queue (SgenThreadPoolJob *job) /* Assumes that the lock is held. */ static void -remove_job (SgenThreadPoolJob *job) +remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job) { ssize_t index; SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?"); - index = find_job_in_queue (job); + index = find_job_in_queue (pool, job); SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?"); - job_queue.data [index] = NULL; - sgen_pointer_queue_remove_nulls (&job_queue); + pool->job_queue.data [index] = NULL; + sgen_pointer_queue_remove_nulls (&pool->job_queue); sgen_thread_pool_job_free (job); } static gboolean -continue_idle_job (void *thread_data) +continue_idle_job (SgenThreadPool *pool, void *thread_data) { - if (!continue_idle_job_func) + if (!pool->continue_idle_job_func) return FALSE; - return continue_idle_job_func (thread_data); + return pool->continue_idle_job_func (thread_data); } static gboolean -should_work (void *thread_data) +should_work (SgenThreadPool *pool, void *thread_data) { - if (!should_work_func) + if (!pool->should_work_func) return TRUE; - return should_work_func (thread_data); + return pool->should_work_func (thread_data); } static mono_native_thread_return_t -thread_func (void *thread_data) +thread_func (SgenThreadPoolData *thread_data) { - thread_init_func (thread_data); + SgenThreadPool *pool = thread_data->pool; + + pool->thread_init_func (thread_data); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); for (;;) { gboolean do_idle; SgenThreadPoolJob *job; - if (!should_work (thread_data) && !threadpool_shutdown) { - mono_os_cond_wait (&work_cond, &lock); + if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) { + mono_os_cond_wait (&pool->work_cond, &pool->lock); continue; } /* @@ -118,51 +96,51 @@ thread_func (void *thread_data) * main thread might then set continue idle and signal us before we can take * the lock, and we'd lose the signal. */ - do_idle = continue_idle_job (thread_data); - job = get_job_and_set_in_progress (); + do_idle = continue_idle_job (pool, thread_data); + job = get_job_and_set_in_progress (pool); - if (!job && !do_idle && !threadpool_shutdown) { + if (!job && !do_idle && !pool->threadpool_shutdown) { /* * pthread_cond_wait() can return successfully despite the condition * not being signalled, so we have to run this in a loop until we * really have work to do. */ - mono_os_cond_wait (&work_cond, &lock); + mono_os_cond_wait (&pool->work_cond, &pool->lock); continue; } - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); if (job) { job->func (thread_data, job); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); job->state = STATE_DONE; - remove_job (job); + remove_job (pool, job); /* * Only the main GC thread will ever wait on the done condition, so we don't * have to broadcast. */ - mono_os_cond_signal (&done_cond); + mono_os_cond_signal (&pool->done_cond); } else if (do_idle) { - SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?"); do { - idle_job_func (thread_data); - do_idle = continue_idle_job (thread_data); - } while (do_idle && !job_queue.next_slot); + pool->idle_job_func (thread_data); + do_idle = continue_idle_job (pool, thread_data); + } while (do_idle && !pool->job_queue.next_slot); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); if (!do_idle) - mono_os_cond_signal (&done_cond); + mono_os_cond_signal (&pool->done_cond); } else { - SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); - mono_os_mutex_lock (&lock); - threads_finished++; - mono_os_cond_signal (&done_cond); - mono_os_mutex_unlock (&lock); + SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?"); + mono_os_mutex_lock (&pool->lock); + pool->threads_finished++; + mono_os_cond_signal (&pool->done_cond); + mono_os_mutex_unlock (&pool->lock); return 0; } } @@ -171,41 +149,49 @@ thread_func (void *thread_data) } void -sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas) +sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas) { int i; - threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS; + SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?"); + + pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS; - mono_os_mutex_init (&lock); - mono_os_cond_init (&work_cond); - mono_os_cond_init (&done_cond); + mono_os_mutex_init (&pool->lock); + mono_os_cond_init (&pool->work_cond); + mono_os_cond_init (&pool->done_cond); - thread_init_func = init_func; - idle_job_func = idle_func; - continue_idle_job_func = continue_idle_func; - should_work_func = should_work_func_p; + pool->thread_init_func = init_func; + pool->idle_job_func = idle_func; + pool->continue_idle_job_func = continue_idle_func; + pool->should_work_func = should_work_func_p; - for (i = 0; i < threads_num; i++) - mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL); + sgen_pointer_queue_init (&pool->job_queue, 0); + pool->threads_finished = 0; + pool->threadpool_shutdown = FALSE; + + for (i = 0; i < pool->threads_num; i++) { + thread_datas [i]->pool = pool; + mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]); + } } void -sgen_thread_pool_shutdown (void) +sgen_thread_pool_shutdown (SgenThreadPool *pool) { - if (!threads_num) + if (!pool) return; - mono_os_mutex_lock (&lock); - threadpool_shutdown = TRUE; - mono_os_cond_broadcast (&work_cond); - while (threads_finished < threads_num) - mono_os_cond_wait (&done_cond, &lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_lock (&pool->lock); + pool->threadpool_shutdown = TRUE; + mono_os_cond_broadcast (&pool->work_cond); + while (pool->threads_finished < pool->threads_num) + mono_os_cond_wait (&pool->done_cond, &pool->lock); + mono_os_mutex_unlock (&pool->lock); - mono_os_mutex_destroy (&lock); - mono_os_cond_destroy (&work_cond); - mono_os_cond_destroy (&done_cond); + mono_os_mutex_destroy (&pool->lock); + mono_os_cond_destroy (&pool->work_cond); + mono_os_cond_destroy (&pool->done_cond); } SgenThreadPoolJob* @@ -226,74 +212,77 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job) } void -sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job) +sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job) { - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - sgen_pointer_queue_add (&job_queue, job); - mono_os_cond_signal (&work_cond); + sgen_pointer_queue_add (&pool->job_queue, job); + mono_os_cond_signal (&pool->work_cond); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_job_wait (SgenThreadPoolJob *job) +sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job) { SGEN_ASSERT (0, job, "Where's the job?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (find_job_in_queue (job) >= 0) - mono_os_cond_wait (&done_cond, &lock); + while (find_job_in_queue (pool, job) >= 0) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_idle_signal (void) +sgen_thread_pool_idle_signal (SgenThreadPool *pool) { - SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - if (continue_idle_job_func (NULL)) - mono_os_cond_broadcast (&work_cond); + if (pool->continue_idle_job_func (NULL)) + mono_os_cond_broadcast (&pool->work_cond); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_idle_wait (void) +sgen_thread_pool_idle_wait (SgenThreadPool *pool) { - SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?"); + SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?"); - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (continue_idle_job_func (NULL)) - mono_os_cond_wait (&done_cond, &lock); + while (pool->continue_idle_job_func (NULL)) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } void -sgen_thread_pool_wait_for_all_jobs (void) +sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool) { - mono_os_mutex_lock (&lock); + mono_os_mutex_lock (&pool->lock); - while (!sgen_pointer_queue_is_empty (&job_queue)) - mono_os_cond_wait (&done_cond, &lock); + while (!sgen_pointer_queue_is_empty (&pool->job_queue)) + mono_os_cond_wait (&pool->done_cond, &pool->lock); - mono_os_mutex_unlock (&lock); + mono_os_mutex_unlock (&pool->lock); } /* Return 0 if is not a thread pool thread or the thread number otherwise */ int -sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) +sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread) { int i; - for (i = 0; i < threads_num; i++) { - if (some_thread == threads [i]) + if (!pool) + return 0; + + for (i = 0; i < pool->threads_num; i++) { + if (some_thread == pool->threads [i]) return i + 1; } diff --git a/mono/sgen/sgen-thread-pool.h b/mono/sgen/sgen-thread-pool.h index ce4bb11e321..1b48e4433f0 100644 --- a/mono/sgen/sgen-thread-pool.h +++ b/mono/sgen/sgen-thread-pool.h @@ -10,9 +10,18 @@ #ifndef __MONO_SGEN_THREAD_POOL_H__ #define __MONO_SGEN_THREAD_POOL_H__ +#include "mono/sgen/sgen-pointer-queue.h" +#include "mono/utils/mono-threads.h" + typedef struct _SgenThreadPoolJob SgenThreadPoolJob; +typedef struct _SgenThreadPool SgenThreadPool; +typedef struct _SgenThreadPoolData SgenThreadPoolData; typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job); +typedef void (*SgenThreadPoolThreadInitFunc) (void*); +typedef void (*SgenThreadPoolIdleJobFunc) (void*); +typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*); +typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*); struct _SgenThreadPoolJob { const char *name; @@ -21,28 +30,49 @@ struct _SgenThreadPoolJob { volatile gint32 state; }; -typedef void (*SgenThreadPoolThreadInitFunc) (void*); -typedef void (*SgenThreadPoolIdleJobFunc) (void*); -typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*); -typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*); +#define MAX_NUM_THREADS 8 + +struct _SgenThreadPool { + mono_mutex_t lock; + mono_cond_t work_cond; + mono_cond_t done_cond; + + int threads_num; + MonoNativeThreadId threads [MAX_NUM_THREADS]; + + /* Only accessed with the lock held. */ + SgenPointerQueue job_queue; + + SgenThreadPoolThreadInitFunc thread_init_func; + SgenThreadPoolIdleJobFunc idle_job_func; + SgenThreadPoolContinueIdleJobFunc continue_idle_job_func; + SgenThreadPoolShouldWorkFunc should_work_func; + + volatile gboolean threadpool_shutdown; + volatile int threads_finished; +}; + +struct _SgenThreadPoolData { + SgenThreadPool *pool; +}; -void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas); +void sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, SgenThreadPoolData **thread_datas); -void sgen_thread_pool_shutdown (void); +void sgen_thread_pool_shutdown (SgenThreadPool *pool); SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size); /* This only needs to be called on jobs that are not enqueued. */ void sgen_thread_pool_job_free (SgenThreadPoolJob *job); -void sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job); +void sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job); /* This must only be called after the job has been enqueued. */ -void sgen_thread_pool_job_wait (SgenThreadPoolJob *job); +void sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job); -void sgen_thread_pool_idle_signal (void); -void sgen_thread_pool_idle_wait (void); +void sgen_thread_pool_idle_signal (SgenThreadPool *pool); +void sgen_thread_pool_idle_wait (SgenThreadPool *pool); -void sgen_thread_pool_wait_for_all_jobs (void); +void sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool); -int sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread); +int sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId thread); #endif diff --git a/mono/sgen/sgen-workers.c b/mono/sgen/sgen-workers.c index d5343db50d1..47fb00d9e1f 100644 --- a/mono/sgen/sgen-workers.c +++ b/mono/sgen/sgen-workers.c @@ -26,6 +26,9 @@ static volatile gboolean forced_stop; static WorkerData *workers_data; static SgenWorkerCallback worker_init_cb; +static SgenThreadPool pool_inst; +static SgenThreadPool *pool; /* null if we're not using workers */ + /* * When using multiple workers, we need to have the last worker * enqueue the preclean jobs (if there are any). This lock ensures @@ -84,7 +87,7 @@ set_state (WorkerData *data, State old_state, State new_state) else if (new_state == STATE_WORKING) SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED"); if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING) - SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING"); + SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (pool, mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING"); return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state; } @@ -128,7 +131,7 @@ sgen_workers_ensure_awake (void) } if (need_signal) - sgen_thread_pool_idle_signal (); + sgen_thread_pool_idle_signal (pool); } static void @@ -200,7 +203,7 @@ sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue) return; } - sgen_thread_pool_job_enqueue (job); + sgen_thread_pool_job_enqueue (pool, job); } static gboolean @@ -371,12 +374,7 @@ void sgen_workers_init (int num_workers, SgenWorkerCallback callback) { int i; - void **workers_data_ptrs = (void **)alloca(num_workers * sizeof(void *)); - - if (!sgen_get_major_collector ()->is_concurrent && !sgen_get_minor_collector ()->is_parallel) { - sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL, NULL); - return; - } + WorkerData **workers_data_ptrs = (WorkerData**)alloca(num_workers * sizeof(WorkerData*)); mono_os_mutex_init (&finished_lock); //g_print ("initing %d workers\n", num_workers); @@ -390,15 +388,23 @@ sgen_workers_init (int num_workers, SgenWorkerCallback callback) init_distribute_gray_queue (); for (i = 0; i < num_workers; ++i) - workers_data_ptrs [i] = (void *) &workers_data [i]; + workers_data_ptrs [i] = &workers_data [i]; worker_init_cb = callback; - sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, workers_data_ptrs); + pool = &pool_inst; + sgen_thread_pool_init (pool, num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (SgenThreadPoolData**)workers_data_ptrs); mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished); } +void +sgen_workers_shutdown (void) +{ + if (pool) + sgen_thread_pool_shutdown (pool); +} + void sgen_workers_stop_all_workers (void) { @@ -406,8 +412,8 @@ sgen_workers_stop_all_workers (void) mono_memory_write_barrier (); forced_stop = TRUE; - sgen_thread_pool_wait_for_all_jobs (); - sgen_thread_pool_idle_wait (); + sgen_thread_pool_wait_for_all_jobs (pool); + sgen_thread_pool_idle_wait (pool); SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); } @@ -446,8 +452,8 @@ sgen_workers_join (void) { int i; - sgen_thread_pool_wait_for_all_jobs (); - sgen_thread_pool_idle_wait (); + sgen_thread_pool_wait_for_all_jobs (pool); + sgen_thread_pool_idle_wait (pool); SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state"); /* At this point all the workers have stopped. */ @@ -546,4 +552,12 @@ sgen_workers_foreach (SgenWorkerCallback callback) callback (&workers_data [i]); } +gboolean +sgen_workers_is_worker_thread (MonoNativeThreadId id) +{ + if (!pool) + return FALSE; + return sgen_thread_pool_is_thread_pool_thread (pool, id); +} + #endif diff --git a/mono/sgen/sgen-workers.h b/mono/sgen/sgen-workers.h index e2f030dc159..78dea19867d 100644 --- a/mono/sgen/sgen-workers.h +++ b/mono/sgen/sgen-workers.h @@ -15,6 +15,11 @@ typedef struct _WorkerData WorkerData; struct _WorkerData { + /* + * Threadpool threads receive as their starting argument a WorkerData. + * tp_data is meant for use inside the sgen thread pool and must be first. + */ + SgenThreadPoolData tp_data; gint32 state; SgenGrayQueue private_gray_queue; /* only read/written by worker thread */ /* @@ -30,6 +35,7 @@ typedef void (*SgenWorkersFinishCallback) (void); typedef void (*SgenWorkerCallback) (WorkerData *data); void sgen_workers_init (int num_workers, SgenWorkerCallback callback); +void sgen_workers_shutdown (void); void sgen_workers_stop_all_workers (void); void sgen_workers_set_num_active_workers (int num_workers); void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job); @@ -46,5 +52,6 @@ void sgen_workers_take_from_queue (SgenGrayQueue *queue); SgenObjectOperations* sgen_workers_get_idle_func_object_ops (void); int sgen_workers_get_job_split_count (void); void sgen_workers_foreach (SgenWorkerCallback callback); +gboolean sgen_workers_is_worker_thread (MonoNativeThreadId id); #endif -- 2.25.1