From 83dfb12187ae97c436498c5d67f0c4db907ec66d Mon Sep 17 00:00:00 2001 From: Mark Probst Date: Tue, 17 Feb 2015 17:11:35 -0800 Subject: [PATCH] [sgen] Workers use thread pool. So we only have one worker thread that's used both for marking and, later, for sweeping, too. --- mono/metadata/sgen-gc.c | 229 +++++++---------- mono/metadata/sgen-gc.h | 10 +- mono/metadata/sgen-internal.c | 3 +- mono/metadata/sgen-marksweep.c | 34 +-- mono/metadata/sgen-memory-governor.c | 3 +- mono/metadata/sgen-os-mach.c | 3 +- mono/metadata/sgen-protocol.c | 3 +- mono/metadata/sgen-stw.c | 3 +- mono/metadata/sgen-thread-pool.c | 151 ++++++++--- mono/metadata/sgen-thread-pool.h | 20 +- mono/metadata/sgen-workers.c | 359 ++++++--------------------- mono/metadata/sgen-workers.h | 18 +- 12 files changed, 340 insertions(+), 496 deletions(-) diff --git a/mono/metadata/sgen-gc.c b/mono/metadata/sgen-gc.c index 61499a3f343..983962db85b 100644 --- a/mono/metadata/sgen-gc.c +++ b/mono/metadata/sgen-gc.c @@ -328,19 +328,15 @@ static guint64 time_minor_pre_collection_fragment_clear = 0; static guint64 time_minor_pinning = 0; static guint64 time_minor_scan_remsets = 0; static guint64 time_minor_scan_pinned = 0; -static guint64 time_minor_scan_registered_roots = 0; -static guint64 time_minor_scan_thread_data = 0; +static guint64 time_minor_scan_roots = 0; static guint64 time_minor_finish_gray_stack = 0; static guint64 time_minor_fragment_creation = 0; static guint64 time_major_pre_collection_fragment_clear = 0; static guint64 time_major_pinning = 0; static guint64 time_major_scan_pinned = 0; -static guint64 time_major_scan_registered_roots = 0; -static guint64 time_major_scan_thread_data = 0; -static guint64 time_major_scan_alloc_pinned = 0; -static guint64 time_major_scan_finalized = 0; -static guint64 time_major_scan_big_objects = 0; +static guint64 time_major_scan_roots = 0; +static guint64 time_major_scan_mod_union = 0; static guint64 time_major_finish_gray_stack = 0; static guint64 time_major_free_bigobjs = 0; static guint64 time_major_los_sweep = 0; @@ -593,12 +589,7 @@ gray_queue_redirect (SgenGrayQueue *queue) if (wake) { g_assert (concurrent_collection_in_progress); - if (sgen_workers_have_started ()) { - sgen_workers_ensure_awake (); - } else { - if (concurrent_collection_in_progress) - g_assert (current_collection_generation == -1); - } + sgen_workers_ensure_awake (); } } @@ -1904,19 +1895,14 @@ init_stats (void) mono_counters_register ("Minor pinning", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_pinning); mono_counters_register ("Minor scan remembered set", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_remsets); mono_counters_register ("Minor scan pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_pinned); - mono_counters_register ("Minor scan registered roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_registered_roots); - mono_counters_register ("Minor scan thread data", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_thread_data); - mono_counters_register ("Minor finish gray stack", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_finish_gray_stack); + mono_counters_register ("Minor scan roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_roots); mono_counters_register ("Minor fragment creation", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_fragment_creation); mono_counters_register ("Major fragment clear", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_pre_collection_fragment_clear); mono_counters_register ("Major pinning", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_pinning); mono_counters_register ("Major scan pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_pinned); - mono_counters_register ("Major scan registered roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_registered_roots); - mono_counters_register ("Major scan thread data", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_thread_data); - mono_counters_register ("Major scan alloc_pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_alloc_pinned); - mono_counters_register ("Major scan finalized", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_finalized); - mono_counters_register ("Major scan big objects", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_big_objects); + mono_counters_register ("Major scan roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_roots); + mono_counters_register ("Major scan mod union", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_mod_union); mono_counters_register ("Major finish gray stack", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_finish_gray_stack); mono_counters_register ("Major free big objects", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_free_bigobjs); mono_counters_register ("Major LOS sweep", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_los_sweep); @@ -1994,66 +1980,75 @@ sgen_concurrent_collection_in_progress (void) } static void -job_remembered_set_scan (WorkerData *worker_data, void *dummy) +job_remembered_set_scan (void *worker_data_untyped, SgenThreadPoolJob *job) { + WorkerData *worker_data = worker_data_untyped; remset.scan_remsets (sgen_workers_get_job_gray_queue (worker_data)); } -typedef struct -{ +typedef struct { + SgenThreadPoolJob job; CopyOrMarkObjectFunc copy_or_mark_func; ScanObjectFunc scan_func; char *heap_start; char *heap_end; int root_type; -} ScanFromRegisteredRootsJobData; +} ScanFromRegisteredRootsJob; static void -job_scan_from_registered_roots (WorkerData *worker_data, void *job_data_untyped) +job_scan_from_registered_roots (void *worker_data_untyped, SgenThreadPoolJob *job) { - ScanFromRegisteredRootsJobData *job_data = job_data_untyped; + WorkerData *worker_data = worker_data_untyped; + ScanFromRegisteredRootsJob *job_data = (ScanFromRegisteredRootsJob*)job; ScanCopyContext ctx = { job_data->scan_func, job_data->copy_or_mark_func, sgen_workers_get_job_gray_queue (worker_data) }; scan_from_registered_roots (job_data->heap_start, job_data->heap_end, job_data->root_type, ctx); - sgen_free_internal_dynamic (job_data, sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA); } -typedef struct -{ +typedef struct { + SgenThreadPoolJob job; char *heap_start; char *heap_end; -} ScanThreadDataJobData; +} ScanThreadDataJob; static void -job_scan_thread_data (WorkerData *worker_data, void *job_data_untyped) +job_scan_thread_data (void *worker_data_untyped, SgenThreadPoolJob *job) { - ScanThreadDataJobData *job_data = job_data_untyped; + WorkerData *worker_data = worker_data_untyped; + ScanThreadDataJob *job_data = (ScanThreadDataJob*)job; scan_thread_data (job_data->heap_start, job_data->heap_end, TRUE, sgen_workers_get_job_gray_queue (worker_data)); - sgen_free_internal_dynamic (job_data, sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA); } +typedef struct { + SgenThreadPoolJob job; + FinalizeReadyEntry *list; +} ScanFinalizerEntriesJob; + static void -job_scan_finalizer_entries (WorkerData *worker_data, void *job_data_untyped) +job_scan_finalizer_entries (void *worker_data_untyped, SgenThreadPoolJob *job) { - FinalizeReadyEntry *list = job_data_untyped; + WorkerData *worker_data = worker_data_untyped; + ScanFinalizerEntriesJob *job_data = (ScanFinalizerEntriesJob*)job; ScanCopyContext ctx = { NULL, current_object_ops.copy_or_mark_object, sgen_workers_get_job_gray_queue (worker_data) }; - scan_finalizer_entries (list, ctx); + scan_finalizer_entries (job_data->list, ctx); } static void -job_scan_major_mod_union_cardtable (WorkerData *worker_data, void *job_data_untyped) +job_scan_major_mod_union_cardtable (void *worker_data_untyped, SgenThreadPoolJob *job) { + WorkerData *worker_data = worker_data_untyped; g_assert (concurrent_collection_in_progress); major_collector.scan_card_table (TRUE, sgen_workers_get_job_gray_queue (worker_data)); } static void -job_scan_los_mod_union_cardtable (WorkerData *worker_data, void *job_data_untyped) +job_scan_los_mod_union_cardtable (void *worker_data_untyped, SgenThreadPoolJob *job) { + WorkerData *worker_data = worker_data_untyped; g_assert (concurrent_collection_in_progress); sgen_los_scan_card_table (TRUE, sgen_workers_get_job_gray_queue (worker_data)); } @@ -2157,6 +2152,49 @@ init_gray_queue (void) sgen_gray_object_queue_init (&gray_queue, NULL); } +static void +enqueue_scan_from_roots_jobs (char *heap_start, char *heap_end) +{ + ScanFromRegisteredRootsJob *scrrj; + ScanThreadDataJob *stdj; + ScanFinalizerEntriesJob *sfej; + + /* registered roots, this includes static fields */ + + scrrj = (ScanFromRegisteredRootsJob*)sgen_thread_pool_job_alloc ("scan from registered roots normal", job_scan_from_registered_roots, sizeof (ScanFromRegisteredRootsJob)); + scrrj->copy_or_mark_func = current_object_ops.copy_or_mark_object; + scrrj->scan_func = current_object_ops.scan_object; + scrrj->heap_start = heap_start; + scrrj->heap_end = heap_end; + scrrj->root_type = ROOT_TYPE_NORMAL; + sgen_workers_enqueue_job (&scrrj->job); + + scrrj = (ScanFromRegisteredRootsJob*)sgen_thread_pool_job_alloc ("scan from registered roots wbarrier", job_scan_from_registered_roots, sizeof (ScanFromRegisteredRootsJob)); + scrrj->copy_or_mark_func = current_object_ops.copy_or_mark_object; + scrrj->scan_func = current_object_ops.scan_object; + scrrj->heap_start = heap_start; + scrrj->heap_end = heap_end; + scrrj->root_type = ROOT_TYPE_WBARRIER; + sgen_workers_enqueue_job (&scrrj->job); + + /* Threads */ + + stdj = (ScanThreadDataJob*)sgen_thread_pool_job_alloc ("scan thread data", job_scan_thread_data, sizeof (ScanThreadDataJob)); + stdj->heap_start = heap_start; + stdj->heap_end = heap_end; + sgen_workers_enqueue_job (&stdj->job); + + /* Scan the list of objects ready for finalization. */ + + sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob)); + sfej->list = fin_ready_list; + sgen_workers_enqueue_job (&sfej->job); + + sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan critical finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob)); + sfej->list = critical_fin_list; + sgen_workers_enqueue_job (&sfej->job); +} + /* * Perform a nursery collection. * @@ -2168,8 +2206,6 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark) gboolean needs_major; size_t max_garbage_amount; char *nursery_next; - ScanFromRegisteredRootsJobData *scrrjd_normal, *scrrjd_wbarrier; - ScanThreadDataJobData *stdjd; mword fragment_total; ScanCopyContext ctx; TV_DECLARE (atv); @@ -2193,6 +2229,8 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark) current_collection_generation = GENERATION_NURSERY; current_object_ops = sgen_minor_collector.serial_ops; + SGEN_ASSERT (0, !sgen_collection_is_concurrent (), "Why is the nursery collection concurrent?"); + reset_pinned_from_failed_allocation (); check_scan_starts (); @@ -2269,7 +2307,7 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark) * as part of which we scan the card table. Then, later, we scan the mod union * cardtable. We should only have to do one. */ - sgen_workers_enqueue_job ("scan remset", job_remembered_set_scan, NULL); + sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan remset", job_remembered_set_scan, sizeof (SgenThreadPoolJob))); /* we don't have complete write barrier yet, so we scan all the old generation sections */ TV_GETTIME (btv); @@ -2293,46 +2331,13 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark) MONO_GC_CHECKPOINT_5 (GENERATION_NURSERY); - /* registered roots, this includes static fields */ - scrrjd_normal = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - scrrjd_normal->copy_or_mark_func = current_object_ops.copy_or_mark_object; - scrrjd_normal->scan_func = current_object_ops.scan_object; - scrrjd_normal->heap_start = sgen_get_nursery_start (); - scrrjd_normal->heap_end = nursery_next; - scrrjd_normal->root_type = ROOT_TYPE_NORMAL; - sgen_workers_enqueue_job ("scan from registered roots normal", job_scan_from_registered_roots, scrrjd_normal); - - scrrjd_wbarrier = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - scrrjd_wbarrier->copy_or_mark_func = current_object_ops.copy_or_mark_object; - scrrjd_wbarrier->scan_func = current_object_ops.scan_object; - scrrjd_wbarrier->heap_start = sgen_get_nursery_start (); - scrrjd_wbarrier->heap_end = nursery_next; - scrrjd_wbarrier->root_type = ROOT_TYPE_WBARRIER; - sgen_workers_enqueue_job ("scan from registered roots wbarrier", job_scan_from_registered_roots, scrrjd_wbarrier); + enqueue_scan_from_roots_jobs (sgen_get_nursery_start (), nursery_next); TV_GETTIME (btv); - time_minor_scan_registered_roots += TV_ELAPSED (atv, btv); + time_minor_scan_roots += TV_ELAPSED (atv, btv); MONO_GC_CHECKPOINT_6 (GENERATION_NURSERY); - - /* thread data */ - stdjd = sgen_alloc_internal_dynamic (sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - stdjd->heap_start = sgen_get_nursery_start (); - stdjd->heap_end = nursery_next; - sgen_workers_enqueue_job ("scan thread data", job_scan_thread_data, stdjd); - - TV_GETTIME (atv); - time_minor_scan_thread_data += TV_ELAPSED (btv, atv); - btv = atv; - MONO_GC_CHECKPOINT_7 (GENERATION_NURSERY); - - g_assert (!sgen_collection_is_concurrent ()); - - /* Scan the list of objects ready for finalization. If */ - sgen_workers_enqueue_job ("scan finalizer entries", job_scan_finalizer_entries, fin_ready_list); - sgen_workers_enqueue_job ("scan criticial finalizer entries", job_scan_finalizer_entries, critical_fin_list); - MONO_GC_CHECKPOINT_8 (GENERATION_NURSERY); finish_gray_stack (GENERATION_NURSERY, &gray_queue); @@ -2447,8 +2452,6 @@ major_copy_or_mark_from_roots (size_t *old_next_pin_slot, gboolean start_concurr char *heap_end = (char*)-1; gboolean profile_roots = mono_profiler_get_events () & MONO_PROFILE_GC_ROOTS; GCRootReport root_report = { 0 }; - ScanFromRegisteredRootsJobData *scrrjd_normal, *scrrjd_wbarrier; - ScanThreadDataJobData *stdjd; ScanCopyContext ctx; if (concurrent_collection_in_progress) { @@ -2632,59 +2635,24 @@ major_copy_or_mark_from_roots (size_t *old_next_pin_slot, gboolean start_concurr TV_GETTIME (atv); time_major_scan_pinned += TV_ELAPSED (btv, atv); - /* registered roots, this includes static fields */ - scrrjd_normal = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - scrrjd_normal->copy_or_mark_func = current_object_ops.copy_or_mark_object; - scrrjd_normal->scan_func = current_object_ops.scan_object; - scrrjd_normal->heap_start = heap_start; - scrrjd_normal->heap_end = heap_end; - scrrjd_normal->root_type = ROOT_TYPE_NORMAL; - sgen_workers_enqueue_job ("scan from registered roots normal", job_scan_from_registered_roots, scrrjd_normal); - - scrrjd_wbarrier = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - scrrjd_wbarrier->copy_or_mark_func = current_object_ops.copy_or_mark_object; - scrrjd_wbarrier->scan_func = current_object_ops.scan_object; - scrrjd_wbarrier->heap_start = heap_start; - scrrjd_wbarrier->heap_end = heap_end; - scrrjd_wbarrier->root_type = ROOT_TYPE_WBARRIER; - sgen_workers_enqueue_job ("scan from registered roots wbarrier", job_scan_from_registered_roots, scrrjd_wbarrier); - - TV_GETTIME (btv); - time_major_scan_registered_roots += TV_ELAPSED (atv, btv); - - /* Threads */ - stdjd = sgen_alloc_internal_dynamic (sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE); - stdjd->heap_start = heap_start; - stdjd->heap_end = heap_end; - sgen_workers_enqueue_job ("scan thread data", job_scan_thread_data, stdjd); - - TV_GETTIME (atv); - time_major_scan_thread_data += TV_ELAPSED (btv, atv); - - TV_GETTIME (btv); - time_major_scan_alloc_pinned += TV_ELAPSED (atv, btv); - if (mono_profiler_get_events () & MONO_PROFILE_GC_ROOTS) report_finalizer_roots (); - /* scan the list of objects ready for finalization */ - sgen_workers_enqueue_job ("scan finalizer entries", job_scan_finalizer_entries, fin_ready_list); - sgen_workers_enqueue_job ("scan critical finalizer entries", job_scan_finalizer_entries, critical_fin_list); + enqueue_scan_from_roots_jobs (heap_start, heap_end); + + TV_GETTIME (btv); + time_major_scan_roots += TV_ELAPSED (atv, btv); if (scan_mod_union) { g_assert (finish_up_concurrent_mark); /* Mod union card table */ - sgen_workers_enqueue_job ("scan mod union cardtable", job_scan_major_mod_union_cardtable, NULL); - sgen_workers_enqueue_job ("scan LOS mod union cardtable", job_scan_los_mod_union_cardtable, NULL); - } - - TV_GETTIME (atv); - time_major_scan_finalized += TV_ELAPSED (btv, atv); - SGEN_LOG (2, "Root scan: %d usecs", TV_ELAPSED (btv, atv)); + sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan mod union cardtable", job_scan_major_mod_union_cardtable, sizeof (SgenThreadPoolJob))); + sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan LOS mod union cardtable", job_scan_los_mod_union_cardtable, sizeof (SgenThreadPoolJob))); - TV_GETTIME (btv); - time_major_scan_big_objects += TV_ELAPSED (atv, btv); + TV_GETTIME (atv); + time_major_scan_mod_union += TV_ELAPSED (btv, atv); + } } static void @@ -2750,13 +2718,6 @@ major_start_collection (gboolean concurrent, size_t *old_next_pin_slot) major_finish_copy_or_mark (); } -static void -wait_for_workers_to_finish (void) -{ - while (!sgen_workers_all_done ()) - g_usleep (200); -} - static void major_finish_collection (const char *reason, size_t old_next_pin_slot, gboolean forced, gboolean scan_whole_nursery) { @@ -3054,7 +3015,7 @@ major_finish_concurrent_collection (gboolean forced) * marking before the nursery collection is allowed to run, otherwise we might miss * some remsets. */ - wait_for_workers_to_finish (); + sgen_workers_wait (); SGEN_TV_GETTIME (time_major_conc_collection_end); gc_stats.major_gc_time_concurrent += SGEN_TV_ELAPSED (time_major_conc_collection_start, time_major_conc_collection_end); @@ -4856,9 +4817,6 @@ mono_gc_base_init (void) g_strfreev (opts); } - if (major_collector.is_concurrent) - sgen_workers_init (1); - if (major_collector_opt) g_free (major_collector_opt); @@ -5030,6 +4988,9 @@ mono_gc_base_init (void) if (major_collector.post_param_init) major_collector.post_param_init (&major_collector); + if (major_collector.needs_thread_pool) + sgen_workers_init (1); + sgen_memgov_init (max_heap, soft_limit, debug_print_allowance, allowance_ratio, save_target); memset (&remset, 0, sizeof (remset)); diff --git a/mono/metadata/sgen-gc.h b/mono/metadata/sgen-gc.h index 229162c0d69..fe989be2a0b 100644 --- a/mono/metadata/sgen-gc.h +++ b/mono/metadata/sgen-gc.h @@ -392,8 +392,6 @@ gboolean sgen_resume_thread (SgenThreadInfo *info); void sgen_wait_for_suspend_ack (int count); void sgen_os_init (void); -gboolean sgen_is_worker_thread (MonoNativeThreadId thread); - void sgen_update_heap_boundaries (mword low, mword high); void sgen_scan_area_with_callback (char *start, char *end, IterateObjectCallbackFunc callback, void *data, gboolean allow_flags); @@ -421,7 +419,7 @@ enum { INTERNAL_MEM_MS_BLOCK_INFO_SORT, INTERNAL_MEM_EPHEMERON_LINK, INTERNAL_MEM_WORKER_DATA, - INTERNAL_MEM_WORKER_JOB_DATA, + INTERNAL_MEM_THREAD_POOL_JOB, INTERNAL_MEM_BRIDGE_DATA, INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE, INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE_ENTRY, @@ -433,7 +431,6 @@ enum { INTERNAL_MEM_TARJAN_BRIDGE_HASH_TABLE_ENTRY, INTERNAL_MEM_TARJAN_OBJ_BUCKET, INTERNAL_MEM_BRIDGE_DEBUG, - INTERNAL_MEM_JOB_QUEUE_ENTRY, INTERNAL_MEM_TOGGLEREF_DATA, INTERNAL_MEM_CARDTABLE_MOD_UNION, INTERNAL_MEM_BINARY_PROTOCOL, @@ -613,12 +610,13 @@ void sgen_split_nursery_init (SgenMinorCollector *collector); /* Updating references */ #ifdef SGEN_CHECK_UPDATE_REFERENCE +gboolean sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) MONO_INTERNAL; static inline void sgen_update_reference (void **p, void *o, gboolean allow_null) { if (!allow_null) SGEN_ASSERT (0, o, "Cannot update a reference with a NULL pointer"); - SGEN_ASSERT (0, !sgen_is_worker_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread"); + SGEN_ASSERT (0, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread"); *p = o; } @@ -654,6 +652,7 @@ typedef struct _SgenMajorCollector SgenMajorCollector; struct _SgenMajorCollector { size_t section_size; gboolean is_concurrent; + gboolean needs_thread_pool; gboolean supports_cardtable; gboolean sweeps_lazily; @@ -707,7 +706,6 @@ struct _SgenMajorCollector { size_t (*get_bytes_survived_last_sweep) (void); gboolean (*handle_gc_param) (const char *opt); void (*print_gc_param_usage) (void); - gboolean (*is_worker_thread) (MonoNativeThreadId thread); void (*post_param_init) (SgenMajorCollector *collector); void* (*alloc_worker_data) (void); void (*init_worker_thread) (void *data); diff --git a/mono/metadata/sgen-internal.c b/mono/metadata/sgen-internal.c index f5b8995355f..dc484cc2a10 100644 --- a/mono/metadata/sgen-internal.c +++ b/mono/metadata/sgen-internal.c @@ -134,7 +134,7 @@ description_for_type (int type) case INTERNAL_MEM_MS_BLOCK_INFO_SORT: return "marksweep-block-info-sort"; case INTERNAL_MEM_EPHEMERON_LINK: return "ephemeron-link"; case INTERNAL_MEM_WORKER_DATA: return "worker-data"; - case INTERNAL_MEM_WORKER_JOB_DATA: return "worker-job-data"; + case INTERNAL_MEM_THREAD_POOL_JOB: return "thread-pool-job"; case INTERNAL_MEM_BRIDGE_DATA: return "bridge-data"; case INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE: return "old-bridge-hash-table"; case INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE_ENTRY: return "old-bridge-hash-table-entry"; @@ -146,7 +146,6 @@ description_for_type (int type) case INTERNAL_MEM_BRIDGE_ALIVE_HASH_TABLE: return "bridge-alive-hash-table"; case INTERNAL_MEM_BRIDGE_ALIVE_HASH_TABLE_ENTRY: return "bridge-alive-hash-table-entry"; case INTERNAL_MEM_BRIDGE_DEBUG: return "bridge-debug"; - case INTERNAL_MEM_JOB_QUEUE_ENTRY: return "job-queue-entry"; case INTERNAL_MEM_TOGGLEREF_DATA: return "toggleref-data"; case INTERNAL_MEM_CARDTABLE_MOD_UNION: return "cardtable-mod-union"; case INTERNAL_MEM_BINARY_PROTOCOL: return "binary-protocol"; diff --git a/mono/metadata/sgen-marksweep.c b/mono/metadata/sgen-marksweep.c index 3688b46aadc..d35f12b5876 100644 --- a/mono/metadata/sgen-marksweep.c +++ b/mono/metadata/sgen-marksweep.c @@ -804,12 +804,13 @@ set_sweep_state (int new, int expected) static gboolean ensure_block_is_checked_for_sweeping (int block_index, gboolean wait, gboolean *have_checked); -static SgenThreadPoolJob sweep_job; +static SgenThreadPoolJob * volatile sweep_job; static void major_finish_sweep_checking (void) { int block_index; + SgenThreadPoolJob *job; retry: switch (sweep_state) { @@ -841,7 +842,10 @@ major_finish_sweep_checking (void) set_sweep_state (SWEEP_STATE_SWEEPING, SWEEP_STATE_SWEEPING_AND_ITERATING); wait: - sgen_thread_pool_job_wait (&sweep_job); + job = sweep_job; + if (job) + sgen_thread_pool_job_wait (job); + SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?"); SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?"); } @@ -1061,7 +1065,7 @@ static void major_copy_or_mark_object_with_evacuation_concurrent (void **ptr, void *obj, SgenGrayQueue *queue) { SGEN_ASSERT (9, sgen_concurrent_collection_in_progress (), "Why are we scanning concurrently when there's no concurrent collection on?"); - SGEN_ASSERT (9, !sgen_workers_are_working () || sgen_is_worker_thread (mono_native_thread_id_get ()), "We must not scan from two threads at the same time!"); + SGEN_ASSERT (9, !sgen_workers_are_working () || sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "We must not scan from two threads at the same time!"); g_assert (!SGEN_OBJECT_IS_FORWARDED (obj)); @@ -1523,7 +1527,7 @@ ensure_block_is_checked_for_sweeping (int block_index, gboolean wait, gboolean * } static void -sweep_job_func (SgenThreadPoolJob *job) +sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job) { int block_index; int num_blocks = num_major_sections_before_sweep; @@ -1566,6 +1570,8 @@ sweep_job_func (SgenThreadPoolJob *job) sgen_pointer_queue_remove_nulls (&allocated_blocks); sweep_finish (); + + sweep_job = NULL; } static void @@ -1611,11 +1617,12 @@ major_sweep (void) num_major_sections_before_sweep = num_major_sections; num_major_sections_freed_in_sweep = 0; + SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?"); if (concurrent_sweep) { - sgen_thread_pool_job_init (&sweep_job, sweep_job_func); - sgen_thread_pool_job_enqueue (&sweep_job); + sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob)); + sgen_thread_pool_job_enqueue (sweep_job); } else { - sweep_job_func (NULL); + sweep_job_func (NULL, NULL); } } @@ -2339,6 +2346,7 @@ static void post_param_init (SgenMajorCollector *collector) { collector->sweeps_lazily = lazy_sweep; + collector->needs_thread_pool = concurrent_mark || concurrent_sweep; } static void @@ -2391,13 +2399,12 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr collector->section_size = MAJOR_SECTION_SIZE; concurrent_mark = is_concurrent; - if (is_concurrent) { - collector->is_concurrent = TRUE; + collector->is_concurrent = is_concurrent; + collector->needs_thread_pool = is_concurrent || concurrent_sweep; + if (is_concurrent) collector->want_synchronous_collection = &want_evacuation; - } else { - collector->is_concurrent = FALSE; + else collector->want_synchronous_collection = NULL; - } collector->get_and_reset_num_major_objects_marked = major_get_and_reset_num_major_objects_marked; collector->supports_cardtable = TRUE; @@ -2476,9 +2483,6 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr mono_mutex_init (&scanned_objects_list_lock); #endif - if (concurrent_sweep) - sgen_thread_pool_init (1); - SGEN_ASSERT (0, SGEN_MAX_SMALL_OBJ_SIZE <= MS_BLOCK_FREE / 2, "MAX_SMALL_OBJ_SIZE must be at most MS_BLOCK_FREE / 2"); /*cardtable requires major pages to be 8 cards aligned*/ diff --git a/mono/metadata/sgen-memory-governor.c b/mono/metadata/sgen-memory-governor.c index bf8831268e4..42231e4fcae 100644 --- a/mono/metadata/sgen-memory-governor.c +++ b/mono/metadata/sgen-memory-governor.c @@ -29,6 +29,7 @@ #include "metadata/sgen-gc.h" #include "metadata/sgen-memory-governor.h" +#include "metadata/sgen-thread-pool.h" #include "metadata/mono-gc.h" #include "utils/mono-counters.h" @@ -335,7 +336,7 @@ gboolean sgen_memgov_try_alloc_space (mword size, int space) { if (sgen_memgov_available_free_space () < size) { - SGEN_ASSERT (4, !sgen_is_worker_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread"); + SGEN_ASSERT (4, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread"); return FALSE; } diff --git a/mono/metadata/sgen-os-mach.c b/mono/metadata/sgen-os-mach.c index 7d599e20908..403d04c3d9a 100644 --- a/mono/metadata/sgen-os-mach.c +++ b/mono/metadata/sgen-os-mach.c @@ -31,6 +31,7 @@ #include "metadata/sgen-gc.h" #include "metadata/sgen-archdep.h" #include "metadata/sgen-protocol.h" +#include "metadata/sgen-thread-pool.h" #include "metadata/object-internals.h" #include "metadata/gc-internal.h" @@ -116,7 +117,7 @@ sgen_thread_handshake (BOOL suspend) cur_thread->suspend_done = TRUE; FOREACH_THREAD_SAFE (info) { - if (info == cur_thread || sgen_is_worker_thread (mono_thread_info_get_tid (info))) + if (info == cur_thread || sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) continue; info->suspend_done = FALSE; diff --git a/mono/metadata/sgen-protocol.c b/mono/metadata/sgen-protocol.c index 22f70c30417..bbabb8ee36d 100644 --- a/mono/metadata/sgen-protocol.c +++ b/mono/metadata/sgen-protocol.c @@ -27,6 +27,7 @@ #include "sgen-gc.h" #include "sgen-protocol.h" #include "sgen-memory-governor.h" +#include "sgen-thread-pool.h" #include "utils/mono-mmap.h" #include "utils/mono-threads.h" @@ -293,7 +294,7 @@ protocol_entry (unsigned char type, gpointer data, int size) if (binary_protocol_file == -1) return; - if (sgen_is_worker_thread (mono_native_thread_id_get ())) + if (sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ())) type |= 0x80; lock_recursive (); diff --git a/mono/metadata/sgen-stw.c b/mono/metadata/sgen-stw.c index 700241621ba..9078f6bb07a 100644 --- a/mono/metadata/sgen-stw.c +++ b/mono/metadata/sgen-stw.c @@ -30,6 +30,7 @@ #include "metadata/sgen-gc.h" #include "metadata/sgen-protocol.h" #include "metadata/sgen-memory-governor.h" +#include "metadata/sgen-thread-pool.h" #include "metadata/profiler-private.h" #include "utils/mono-time.h" #include "utils/dtrace.h" @@ -397,7 +398,7 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info) We can't suspend the workers that will do all the heavy lifting. FIXME Use some state bit in SgenThreadInfo for this. */ - if (sgen_is_worker_thread (mono_thread_info_get_tid (info))) { + if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) { return FALSE; } diff --git a/mono/metadata/sgen-thread-pool.c b/mono/metadata/sgen-thread-pool.c index 81437d5f937..71d8d70742a 100644 --- a/mono/metadata/sgen-thread-pool.c +++ b/mono/metadata/sgen-thread-pool.c @@ -34,6 +34,10 @@ static MonoNativeThreadId thread; /* Only accessed with the lock held. */ static SgenPointerQueue job_queue; +static volatile gboolean idle_working; + +static SgenThreadPoolThreadInitFunc thread_init_func; +static SgenThreadPoolIdleJobFunc idle_job_func; enum { STATE_WAITING, @@ -43,64 +47,93 @@ enum { /* Assumes that the lock is held. */ static SgenThreadPoolJob* -get_job (void) +get_job_and_set_in_progress (void) { for (size_t i = 0; i < job_queue.next_slot; ++i) { SgenThreadPoolJob *job = job_queue.data [i]; - if (job->state == STATE_WAITING) + if (job->state == STATE_WAITING) { + job->state = STATE_IN_PROGRESS; return job; + } } return NULL; } +/* Assumes that the lock is held. */ +static ssize_t +find_job_in_queue (SgenThreadPoolJob *job) +{ + for (ssize_t i = 0; i < job_queue.next_slot; ++i) { + if (job_queue.data [i] == job) + return i; + } + return -1; +} + /* Assumes that the lock is held. */ static void remove_job (SgenThreadPoolJob *job) { - gboolean found = FALSE; + ssize_t index; SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?"); - for (size_t i = 0; i < job_queue.next_slot; ++i) { - if (job_queue.data [i] == job) { - job_queue.data [i] = NULL; - found = TRUE; - break; - } - } - SGEN_ASSERT (0, found, "Why is the job we're trying to remove not in the queue?"); + index = find_job_in_queue (job); + SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?"); + job_queue.data [index] = NULL; sgen_pointer_queue_remove_nulls (&job_queue); + sgen_thread_pool_job_free (job); } static mono_native_thread_return_t -thread_func (void *arg) +thread_func (void *thread_data) { - mono_thread_info_register_small_id (); + thread_init_func (thread_data); mono_mutex_lock (&lock); for (;;) { SgenThreadPoolJob *job; + gboolean do_idle = idle_working; - while (!(job = get_job ())) + job = get_job_and_set_in_progress (); + if (!job && !do_idle) { mono_cond_wait (&work_cond, &lock); - SGEN_ASSERT (0, job->state == STATE_WAITING, "The job we got is in the wrong state. Should be waiting."); - job->state = STATE_IN_PROGRESS; + do_idle = idle_working; + job = get_job_and_set_in_progress (); + } + mono_mutex_unlock (&lock); - job->func (job); - - mono_mutex_lock (&lock); - SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); - job->state = STATE_DONE; - remove_job (job); - /* - * Only the main GC thread will ever wait on the done condition, so we don't - * have to broadcast. - */ - mono_cond_signal (&done_cond); + if (job) { + job->func (thread_data, job); + + mono_mutex_lock (&lock); + + SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress."); + job->state = STATE_DONE; + remove_job (job); + /* + * Only the main GC thread will ever wait on the done condition, so we don't + * have to broadcast. + */ + mono_cond_signal (&done_cond); + } else { + SGEN_ASSERT (0, do_idle, "Why did we unlock if we still have to wait for idle?"); + SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?"); + do { + do_idle = idle_job_func (thread_data); + } while (do_idle && !job_queue.next_slot); + + mono_mutex_lock (&lock); + + if (!do_idle) { + idle_working = FALSE; + mono_cond_signal (&done_cond); + } + } } } void -sgen_thread_pool_init (int num_threads) +sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, void **thread_datas) { SGEN_ASSERT (0, num_threads == 1, "We only support 1 thread pool thread for now."); @@ -108,14 +141,28 @@ sgen_thread_pool_init (int num_threads) mono_cond_init (&work_cond, NULL); mono_cond_init (&done_cond, NULL); - mono_native_thread_create (&thread, thread_func, NULL); + thread_init_func = init_func; + idle_job_func = idle_func; + idle_working = idle_func != NULL; + + mono_native_thread_create (&thread, thread_func, thread_datas ? thread_datas [0] : NULL); } -void -sgen_thread_pool_job_init (SgenThreadPoolJob *job, SgenThreadPoolJobFunc func) +SgenThreadPoolJob* +sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size) { + SgenThreadPoolJob *job = sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE); + job->name = name; + job->size = size; job->state = STATE_WAITING; job->func = func; + return job; +} + +void +sgen_thread_pool_job_free (SgenThreadPoolJob *job) +{ + sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB); } void @@ -136,9 +183,43 @@ sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job) void sgen_thread_pool_job_wait (SgenThreadPoolJob *job) { + SGEN_ASSERT (0, job, "Where's the job?"); + mono_mutex_lock (&lock); - while (job->state != STATE_DONE) + while (find_job_in_queue (job) >= 0) + mono_cond_wait (&done_cond, &lock); + + mono_mutex_unlock (&lock); +} + +void +sgen_thread_pool_idle_signal (void) +{ + SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?"); + + if (idle_working) + return; + + mono_mutex_lock (&lock); + + idle_working = TRUE; + mono_cond_signal (&work_cond); + + mono_mutex_unlock (&lock); +} + +void +sgen_thread_pool_idle_wait (void) +{ + SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?"); + + if (!idle_working) + return; + + mono_mutex_lock (&lock); + + while (idle_working) mono_cond_wait (&done_cond, &lock); mono_mutex_unlock (&lock); @@ -155,4 +236,10 @@ sgen_thread_pool_wait_for_all_jobs (void) mono_mutex_unlock (&lock); } +gboolean +sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) +{ + return some_thread == thread; +} + #endif diff --git a/mono/metadata/sgen-thread-pool.h b/mono/metadata/sgen-thread-pool.h index 3ea4c5a6d03..e0673f04a7b 100644 --- a/mono/metadata/sgen-thread-pool.h +++ b/mono/metadata/sgen-thread-pool.h @@ -22,19 +22,33 @@ typedef struct _SgenThreadPoolJob SgenThreadPoolJob; -typedef void (*SgenThreadPoolJobFunc) (SgenThreadPoolJob *job); +typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job); struct _SgenThreadPoolJob { + const char *name; SgenThreadPoolJobFunc func; + size_t size; volatile gint32 state; }; -void sgen_thread_pool_init (int num_threads); +typedef void (*SgenThreadPoolThreadInitFunc) (void*); +typedef gboolean (*SgenThreadPoolIdleJobFunc) (void*); + +void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, void **thread_datas); + +SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size); +/* This only needs to be called on jobs that are not enqueued. */ +void sgen_thread_pool_job_free (SgenThreadPoolJob *job); -void sgen_thread_pool_job_init (SgenThreadPoolJob *job, SgenThreadPoolJobFunc func); void sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job); +/* This must only be called after the job has been enqueued. */ void sgen_thread_pool_job_wait (SgenThreadPoolJob *job); +void sgen_thread_pool_idle_signal (void); +void sgen_thread_pool_idle_wait (void); + void sgen_thread_pool_wait_for_all_jobs (void); +gboolean sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread); + #endif diff --git a/mono/metadata/sgen-workers.c b/mono/metadata/sgen-workers.c index a262207c075..f5436265610 100644 --- a/mono/metadata/sgen-workers.c +++ b/mono/metadata/sgen-workers.c @@ -24,6 +24,7 @@ #include "metadata/sgen-gc.h" #include "metadata/sgen-workers.h" +#include "metadata/sgen-thread-pool.h" #include "utils/mono-counters.h" static int workers_num; @@ -33,50 +34,22 @@ static void *workers_gc_thread_major_collector_data = NULL; static SgenSectionGrayQueue workers_distribute_gray_queue; static gboolean workers_distribute_gray_queue_inited; -static gboolean workers_started = FALSE; - enum { STATE_NOT_WORKING, STATE_WORKING, STATE_NURSERY_COLLECTION } WorkersStateName; -/* - * | state | num_awake | num_posted | post_done | - * |--------------------------+-----------+----------------------------+-----------| - * | STATE_NOT_WORKING | 0 | 0 | 0 | - * | STATE_WORKING | > 0 | <= workers_num - num_awake | * | - * | STATE_NURSERY_COLLECTION | * | <= workers_num - num_awake | 1 | - * | STATE_NURSERY_COLLECTION | 0 | 0 | 0 | - */ typedef union { gint32 value; struct { guint state : 4; /* WorkersStateName */ - /* Number of worker threads awake. */ - guint num_awake : 8; - /* The state of the waiting semaphore. */ - guint num_posted : 8; - /* Whether to post `workers_done_sem` */ - guint post_done : 1; } data; } State; static volatile State workers_state; -static MonoSemType workers_waiting_sem; -static MonoSemType workers_done_sem; - -static volatile int workers_job_queue_num_entries = 0; -static volatile JobQueueEntry *workers_job_queue = NULL; -static LOCK_DECLARE (workers_job_queue_mutex); -static int workers_num_jobs_enqueued = 0; -static volatile int workers_num_jobs_finished = 0; - -static guint64 stat_workers_stolen_from_self_lock; -static guint64 stat_workers_stolen_from_self_no_lock; -static guint64 stat_workers_stolen_from_others; -static guint64 stat_workers_num_waited; +static guint64 stat_workers_num_finished; static gboolean set_state (State old_state, State new_state) @@ -92,9 +65,6 @@ static void assert_not_working (State state) { SGEN_ASSERT (0, state.data.state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state"); - SGEN_ASSERT (0, state.data.num_awake == 0, "No workers can be awake when not working"); - SGEN_ASSERT (0, state.data.num_posted == 0, "Can't have posted already"); - SGEN_ASSERT (0, !state.data.post_done, "post_done can only be set when working"); } @@ -102,26 +72,12 @@ static void assert_working (State state, gboolean from_worker) { SGEN_ASSERT (0, state.data.state == STATE_WORKING, "A worker can't wait without being in working state"); - if (from_worker) - SGEN_ASSERT (0, state.data.num_awake > 0, "How can we be awake, yet we are not counted?"); - else - SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted > 0, "How can we be working, yet no worker threads are awake or to be awoken?"); - SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted <= workers_num, "There are too many worker threads awake"); } static void assert_nursery_collection (State state, gboolean from_worker) { SGEN_ASSERT (0, state.data.state == STATE_NURSERY_COLLECTION, "Must be in the nursery collection state"); - if (from_worker) { - SGEN_ASSERT (0, state.data.num_awake > 0, "We're awake, but num_awake is zero"); - SGEN_ASSERT (0, state.data.post_done, "post_done must be set in the nursery collection state"); - } - SGEN_ASSERT (0, state.data.num_awake <= workers_num, "There are too many worker threads awake"); - if (!state.data.post_done) { - SGEN_ASSERT (0, state.data.num_awake == 0, "Once done has been posted no threads can be awake"); - SGEN_ASSERT (0, state.data.num_posted == 0, "Once done has been posted no thread must be awoken"); - } } static void @@ -134,85 +90,54 @@ assert_working_or_nursery_collection (State state) } static void -workers_signal_enqueue_work (int num_wake_up, gboolean from_nursery_collection) +workers_signal_enqueue_work (gboolean from_nursery_collection) { State old_state = workers_state; State new_state = old_state; - int i; gboolean did_set_state; - SGEN_ASSERT (0, num_wake_up <= workers_num, "Cannot wake up more workers than are present"); - if (from_nursery_collection) assert_nursery_collection (old_state, FALSE); else assert_not_working (old_state); new_state.data.state = STATE_WORKING; - new_state.data.num_posted = num_wake_up; did_set_state = set_state (old_state, new_state); SGEN_ASSERT (0, did_set_state, "Nobody else should be mutating the state"); - for (i = 0; i < num_wake_up; ++i) - MONO_SEM_POST (&workers_waiting_sem); + sgen_thread_pool_idle_signal (); } static void -workers_signal_enqueue_work_if_necessary (int num_wake_up) +workers_signal_enqueue_work_if_necessary (void) { if (workers_state.data.state == STATE_NOT_WORKING) - workers_signal_enqueue_work (num_wake_up, FALSE); + workers_signal_enqueue_work (FALSE); } void sgen_workers_ensure_awake (void) { SGEN_ASSERT (0, workers_state.data.state != STATE_NURSERY_COLLECTION, "Can't wake workers during nursery collection"); - workers_signal_enqueue_work_if_necessary (workers_num); + workers_signal_enqueue_work_if_necessary (); } static void -workers_wait (void) +worker_finish (void) { State old_state, new_state; - gboolean post_done; - ++stat_workers_num_waited; + ++stat_workers_num_finished; do { new_state = old_state = workers_state; assert_working_or_nursery_collection (old_state); - --new_state.data.num_awake; - post_done = FALSE; - if (!new_state.data.num_awake && !new_state.data.num_posted) { - /* We are the last thread to go to sleep. */ - if (old_state.data.state == STATE_WORKING) - new_state.data.state = STATE_NOT_WORKING; - - new_state.data.post_done = 0; - if (old_state.data.post_done) - post_done = TRUE; - } - } while (!set_state (old_state, new_state)); - - if (post_done) - MONO_SEM_POST (&workers_done_sem); - - MONO_SEM_WAIT (&workers_waiting_sem); - - do { - new_state = old_state = workers_state; - - SGEN_ASSERT (0, old_state.data.num_posted > 0, "How can we be awake without the semaphore having been posted?"); - SGEN_ASSERT (0, old_state.data.num_awake < workers_num, "There are too many worker threads awake"); - - --new_state.data.num_posted; - ++new_state.data.num_awake; - - assert_working_or_nursery_collection (new_state); + /* We are the last thread to go to sleep. */ + if (old_state.data.state == STATE_WORKING) + new_state.data.state = STATE_NOT_WORKING; } while (!set_state (old_state, new_state)); } @@ -223,41 +148,21 @@ collection_needs_workers (void) } void -sgen_workers_enqueue_job (const char *name, JobFunc func, void *data) +sgen_workers_enqueue_job (SgenThreadPoolJob *job) { - int num_entries; - JobQueueEntry *entry; - if (!collection_needs_workers ()) { - func (NULL, data); + job->func (NULL, job); + sgen_thread_pool_job_free (job); return; } - entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY); - entry->name = name; - entry->func = func; - entry->data = data; - - mono_mutex_lock (&workers_job_queue_mutex); - entry->next = workers_job_queue; - workers_job_queue = entry; - num_entries = ++workers_job_queue_num_entries; - ++workers_num_jobs_enqueued; - mono_mutex_unlock (&workers_job_queue_mutex); - - if (workers_state.data.state != STATE_NURSERY_COLLECTION) - workers_signal_enqueue_work_if_necessary (num_entries < workers_num ? num_entries : workers_num); + sgen_thread_pool_job_enqueue (job); } void sgen_workers_wait_for_jobs_finished (void) { - // FIXME: implement this properly - while (workers_num_jobs_finished < workers_num_jobs_enqueued) { - workers_signal_enqueue_work_if_necessary (workers_num); - /* FIXME: sleep less? */ - g_usleep (1000); - } + sgen_thread_pool_wait_for_all_jobs (); } void @@ -274,66 +179,20 @@ sgen_workers_signal_start_nursery_collection_and_wait (void) assert_not_working (old_state); } else { assert_working (old_state, FALSE); - SGEN_ASSERT (0, !old_state.data.post_done, "We are not waiting for the workers"); - - new_state.data.post_done = 1; } } while (!set_state (old_state, new_state)); - if (new_state.data.post_done) - MONO_SEM_WAIT (&workers_done_sem); + sgen_thread_pool_idle_wait (); old_state = workers_state; assert_nursery_collection (old_state, FALSE); - SGEN_ASSERT (0, !old_state.data.post_done, "We got the semaphore, so it must have been posted"); } void sgen_workers_signal_finish_nursery_collection (void) { - State old_state = workers_state; - - assert_nursery_collection (old_state, FALSE); - SGEN_ASSERT (0, !old_state.data.post_done, "We are finishing the nursery collection, so we should have waited for the semaphore earlier"); - - workers_signal_enqueue_work (workers_num, TRUE); -} - -static gboolean -workers_dequeue_and_do_job (WorkerData *data) -{ - JobQueueEntry *entry; - - /* - * At this point the GC might not be running anymore. We - * could have been woken up by a job that was then taken by - * another thread, after which the collection finished, so we - * first have to successfully dequeue a job before doing - * anything assuming that the collection is still ongoing. - */ - - if (!workers_job_queue_num_entries) - return FALSE; - - mono_mutex_lock (&workers_job_queue_mutex); - entry = (JobQueueEntry*)workers_job_queue; - if (entry) { - workers_job_queue = entry->next; - --workers_job_queue_num_entries; - } - mono_mutex_unlock (&workers_job_queue_mutex); - - if (!entry) - return FALSE; - - g_assert (collection_needs_workers ()); - - entry->func (data, entry->data); - sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY); - - SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1); - - return TRUE; + assert_nursery_collection (workers_state, FALSE); + workers_signal_enqueue_work (TRUE); } static gboolean @@ -373,66 +232,62 @@ init_private_gray_queue (WorkerData *data) sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL); } -static mono_native_thread_return_t -workers_thread_func (void *data_untyped) +static void +thread_pool_init_func (void *data_untyped) { WorkerData *data = data_untyped; SgenMajorCollector *major = sgen_get_major_collector (); mono_thread_info_register_small_id (); + if (!major->is_concurrent) + return; + if (major->init_worker_thread) major->init_worker_thread (data->major_collector_data); init_private_gray_queue (data); +} - for (;;) { - gboolean did_work = FALSE; - - SGEN_ASSERT (0, sgen_get_current_collection_generation () != GENERATION_NURSERY, "Why are we doing work while there's a nursery collection happening?"); - - while (workers_state.data.state == STATE_WORKING && workers_dequeue_and_do_job (data)) { - did_work = TRUE; - /* FIXME: maybe distribute the gray queue here? */ - } +static gboolean +marker_idle_func (void *data_untyped) +{ + WorkerData *data = data_untyped; + SgenMajorCollector *major = sgen_get_major_collector (); - if (!did_work && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) { - SgenObjectOperations *ops = sgen_concurrent_collection_in_progress () - ? &major->major_concurrent_ops - : &major->major_ops; - ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue }; + if (workers_state.data.state != STATE_WORKING) + return FALSE; - g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue)); + SGEN_ASSERT (0, sgen_get_current_collection_generation () != GENERATION_NURSERY, "Why are we doing work while there's a nursery collection happening?"); - while (!sgen_drain_gray_stack (32, ctx)) { - if (workers_state.data.state == STATE_NURSERY_COLLECTION) - workers_wait (); - } - g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); + if (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data)) { + SgenObjectOperations *ops = sgen_concurrent_collection_in_progress () + ? &major->major_concurrent_ops + : &major->major_ops; + ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue }; - init_private_gray_queue (data); + SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?"); - did_work = TRUE; - } + sgen_drain_gray_stack (32, ctx); - if (!did_work) - workers_wait (); + return TRUE; } - /* dummy return to make compilers happy */ - return NULL; + worker_finish (); + + return FALSE; } static void -init_distribute_gray_queue (gboolean locked) +init_distribute_gray_queue (void) { if (workers_distribute_gray_queue_inited) { g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)); - g_assert (!workers_distribute_gray_queue.locked == !locked); + g_assert (workers_distribute_gray_queue.locked); return; } - sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked, + sgen_section_gray_queue_init (&workers_distribute_gray_queue, TRUE, sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL); workers_distribute_gray_queue_inited = TRUE; } @@ -440,19 +295,21 @@ init_distribute_gray_queue (gboolean locked) void sgen_workers_init_distribute_gray_queue (void) { - if (!collection_needs_workers ()) - return; - - init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent); + SGEN_ASSERT (0, sgen_get_major_collector ()->is_concurrent && collection_needs_workers (), + "Why should we init the distribute gray queue if we don't need it?"); + init_distribute_gray_queue (); } void sgen_workers_init (int num_workers) { int i; + void *workers_data_ptrs [num_workers]; - if (!sgen_get_major_collector ()->is_concurrent) + if (!sgen_get_major_collector ()->is_concurrent) { + sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL); return; + } //g_print ("initing %d workers\n", num_workers); @@ -461,10 +318,7 @@ sgen_workers_init (int num_workers) workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE); memset (workers_data, 0, sizeof (WorkerData) * num_workers); - MONO_SEM_INIT (&workers_waiting_sem, 0); - MONO_SEM_INIT (&workers_done_sem, 0); - - init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent); + init_distribute_gray_queue (); if (sgen_get_major_collector ()->alloc_worker_data) workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data (); @@ -474,98 +328,41 @@ sgen_workers_init (int num_workers) if (sgen_get_major_collector ()->alloc_worker_data) workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data (); - } - - LOCK_INIT (workers_job_queue_mutex); - sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry)); - - mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_lock); - mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_no_lock); - mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_others); - mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_waited); -} + workers_data_ptrs [i] = &workers_data [i]; + } -/* only the GC thread is allowed to start and join workers */ + sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, workers_data_ptrs); -static void -workers_start_worker (int index) -{ - g_assert (index >= 0 && index < workers_num); - - g_assert (!workers_data [index].thread); - mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]); + mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished); } void sgen_workers_start_all_workers (void) { - State old_state, new_state; - int i; - gboolean result; - if (!collection_needs_workers ()) return; if (sgen_get_major_collector ()->init_worker_thread) sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data); - old_state = new_state = workers_state; - assert_not_working (old_state); - - g_assert (workers_job_queue_num_entries == 0); - workers_num_jobs_enqueued = 0; - workers_num_jobs_finished = 0; - - if (workers_started) { - workers_signal_enqueue_work (workers_num, FALSE); - return; - } - - new_state.data.state = STATE_WORKING; - new_state.data.num_awake = workers_num; - result = set_state (old_state, new_state); - SGEN_ASSERT (0, result, "Nobody else should have modified the state - workers have not been started yet"); - - for (i = 0; i < workers_num; ++i) - workers_start_worker (i); - - workers_started = TRUE; -} - -gboolean -sgen_workers_have_started (void) -{ - return workers_started; + workers_signal_enqueue_work (FALSE); } void sgen_workers_join (void) { - State old_state; int i; if (!collection_needs_workers ()) return; - for (;;) { - old_state = workers_state; - SGEN_ASSERT (0, old_state.data.state != STATE_NURSERY_COLLECTION, "Can't be in nursery collection when joining"); - - if (old_state.data.state == STATE_WORKING) { - State new_state = old_state; + sgen_thread_pool_wait_for_all_jobs (); - SGEN_ASSERT (0, !old_state.data.post_done, "Why is post_done already set?"); - new_state.data.post_done = 1; - if (!set_state (old_state, new_state)) - continue; - - MONO_SEM_WAIT (&workers_done_sem); - - old_state = workers_state; - } - - assert_not_working (old_state); + for (;;) { + SGEN_ASSERT (0, workers_state.data.state != STATE_NURSERY_COLLECTION, "Can't be in nursery collection when joining"); + sgen_thread_pool_idle_wait (); + assert_not_working (workers_state); /* * Checking whether there is still work left and, if not, going to sleep, @@ -573,10 +370,10 @@ sgen_workers_join (void) * workers. Therefore there's a race condition where work can be added * after they've checked for work, and before they've gone to sleep. */ - if (!workers_job_queue_num_entries && sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) + if (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) break; - workers_signal_enqueue_work (workers_num, FALSE); + workers_signal_enqueue_work (FALSE); } /* At this point all the workers have stopped. */ @@ -586,7 +383,6 @@ sgen_workers_join (void) sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data); } - g_assert (workers_job_queue_num_entries == 0); g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)); for (i = 0; i < workers_num; ++i) g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue)); @@ -601,23 +397,14 @@ sgen_workers_all_done (void) gboolean sgen_workers_are_working (void) { - State state = workers_state; - return state.data.num_awake > 0 || state.data.num_posted > 0; + return workers_state.data.state == STATE_WORKING; } -gboolean -sgen_is_worker_thread (MonoNativeThreadId thread) +void +sgen_workers_wait (void) { - int i; - - if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread)) - return TRUE; - - for (i = 0; i < workers_num; ++i) { - if (workers_data [i].thread == thread) - return TRUE; - } - return FALSE; + sgen_thread_pool_idle_wait (); + SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done after we wait for them?"); } SgenSectionGrayQueue* diff --git a/mono/metadata/sgen-workers.h b/mono/metadata/sgen-workers.h index 5c509de6128..1d3b4fac44c 100644 --- a/mono/metadata/sgen-workers.h +++ b/mono/metadata/sgen-workers.h @@ -21,38 +21,28 @@ #ifndef __MONO_SGEN_WORKER_H__ #define __MONO_SGEN_WORKER_H__ +#include "mono/metadata/sgen-thread-pool.h" + typedef struct _WorkerData WorkerData; struct _WorkerData { int index; - MonoNativeThreadId thread; void *major_collector_data; SgenGrayQueue private_gray_queue; /* only read/written by worker thread */ }; -typedef void (*JobFunc) (WorkerData *worker_data, void *job_data); - -typedef struct _JobQueueEntry JobQueueEntry; -struct _JobQueueEntry { - const char *name; - JobFunc func; - void *data; - - volatile JobQueueEntry *next; -}; - void sgen_workers_init (int num_workers); void sgen_workers_start_all_workers (void); -gboolean sgen_workers_have_started (void); void sgen_workers_ensure_awake (void); void sgen_workers_init_distribute_gray_queue (void); -void sgen_workers_enqueue_job (const char *name, JobFunc func, void *data); +void sgen_workers_enqueue_job (SgenThreadPoolJob *job); void sgen_workers_wait_for_jobs_finished (void); void sgen_workers_distribute_gray_queue_sections (void); void sgen_workers_reset_data (void); void sgen_workers_join (void); gboolean sgen_workers_all_done (void); gboolean sgen_workers_are_working (void); +void sgen_workers_wait (void); SgenSectionGrayQueue* sgen_workers_get_distribute_section_gray_queue (void); void sgen_workers_signal_start_nursery_collection_and_wait (void); -- 2.25.1