[sgen] Workers use thread pool.
authorMark Probst <mark.probst@gmail.com>
Wed, 18 Feb 2015 01:11:35 +0000 (17:11 -0800)
committerMark Probst <mark.probst@gmail.com>
Thu, 2 Apr 2015 23:41:31 +0000 (16:41 -0700)
So we only have one worker thread that's used both for marking and,
later, for sweeping, too.

12 files changed:
mono/metadata/sgen-gc.c
mono/metadata/sgen-gc.h
mono/metadata/sgen-internal.c
mono/metadata/sgen-marksweep.c
mono/metadata/sgen-memory-governor.c
mono/metadata/sgen-os-mach.c
mono/metadata/sgen-protocol.c
mono/metadata/sgen-stw.c
mono/metadata/sgen-thread-pool.c
mono/metadata/sgen-thread-pool.h
mono/metadata/sgen-workers.c
mono/metadata/sgen-workers.h

index 61499a3f34350c6e5f2947c0ee629f629e40fc6a..983962db85b0d20a4160466426dce699068897c2 100644 (file)
@@ -328,19 +328,15 @@ static guint64 time_minor_pre_collection_fragment_clear = 0;
 static guint64 time_minor_pinning = 0;
 static guint64 time_minor_scan_remsets = 0;
 static guint64 time_minor_scan_pinned = 0;
-static guint64 time_minor_scan_registered_roots = 0;
-static guint64 time_minor_scan_thread_data = 0;
+static guint64 time_minor_scan_roots = 0;
 static guint64 time_minor_finish_gray_stack = 0;
 static guint64 time_minor_fragment_creation = 0;
 
 static guint64 time_major_pre_collection_fragment_clear = 0;
 static guint64 time_major_pinning = 0;
 static guint64 time_major_scan_pinned = 0;
-static guint64 time_major_scan_registered_roots = 0;
-static guint64 time_major_scan_thread_data = 0;
-static guint64 time_major_scan_alloc_pinned = 0;
-static guint64 time_major_scan_finalized = 0;
-static guint64 time_major_scan_big_objects = 0;
+static guint64 time_major_scan_roots = 0;
+static guint64 time_major_scan_mod_union = 0;
 static guint64 time_major_finish_gray_stack = 0;
 static guint64 time_major_free_bigobjs = 0;
 static guint64 time_major_los_sweep = 0;
@@ -593,12 +589,7 @@ gray_queue_redirect (SgenGrayQueue *queue)
 
        if (wake) {
                g_assert (concurrent_collection_in_progress);
-               if (sgen_workers_have_started ()) {
-                       sgen_workers_ensure_awake ();
-               } else {
-                       if (concurrent_collection_in_progress)
-                               g_assert (current_collection_generation == -1);
-               }
+               sgen_workers_ensure_awake ();
        }
 }
 
@@ -1904,19 +1895,14 @@ init_stats (void)
        mono_counters_register ("Minor pinning", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_pinning);
        mono_counters_register ("Minor scan remembered set", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_remsets);
        mono_counters_register ("Minor scan pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_pinned);
-       mono_counters_register ("Minor scan registered roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_registered_roots);
-       mono_counters_register ("Minor scan thread data", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_thread_data);
-       mono_counters_register ("Minor finish gray stack", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_finish_gray_stack);
+       mono_counters_register ("Minor scan roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_scan_roots);
        mono_counters_register ("Minor fragment creation", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_minor_fragment_creation);
 
        mono_counters_register ("Major fragment clear", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_pre_collection_fragment_clear);
        mono_counters_register ("Major pinning", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_pinning);
        mono_counters_register ("Major scan pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_pinned);
-       mono_counters_register ("Major scan registered roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_registered_roots);
-       mono_counters_register ("Major scan thread data", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_thread_data);
-       mono_counters_register ("Major scan alloc_pinned", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_alloc_pinned);
-       mono_counters_register ("Major scan finalized", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_finalized);
-       mono_counters_register ("Major scan big objects", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_big_objects);
+       mono_counters_register ("Major scan roots", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_roots);
+       mono_counters_register ("Major scan mod union", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_scan_mod_union);
        mono_counters_register ("Major finish gray stack", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_finish_gray_stack);
        mono_counters_register ("Major free big objects", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_free_bigobjs);
        mono_counters_register ("Major LOS sweep", MONO_COUNTER_GC | MONO_COUNTER_ULONG | MONO_COUNTER_TIME, &time_major_los_sweep);
@@ -1994,66 +1980,75 @@ sgen_concurrent_collection_in_progress (void)
 }
 
 static void
-job_remembered_set_scan (WorkerData *worker_data, void *dummy)
+job_remembered_set_scan (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
+       WorkerData *worker_data = worker_data_untyped;
        remset.scan_remsets (sgen_workers_get_job_gray_queue (worker_data));
 }
 
-typedef struct
-{
+typedef struct {
+       SgenThreadPoolJob job;
        CopyOrMarkObjectFunc copy_or_mark_func;
        ScanObjectFunc scan_func;
        char *heap_start;
        char *heap_end;
        int root_type;
-} ScanFromRegisteredRootsJobData;
+} ScanFromRegisteredRootsJob;
 
 static void
-job_scan_from_registered_roots (WorkerData *worker_data, void *job_data_untyped)
+job_scan_from_registered_roots (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
-       ScanFromRegisteredRootsJobData *job_data = job_data_untyped;
+       WorkerData *worker_data = worker_data_untyped;
+       ScanFromRegisteredRootsJob *job_data = (ScanFromRegisteredRootsJob*)job;
        ScanCopyContext ctx = { job_data->scan_func, job_data->copy_or_mark_func,
                sgen_workers_get_job_gray_queue (worker_data) };
 
        scan_from_registered_roots (job_data->heap_start, job_data->heap_end, job_data->root_type, ctx);
-       sgen_free_internal_dynamic (job_data, sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA);
 }
 
-typedef struct
-{
+typedef struct {
+       SgenThreadPoolJob job;
        char *heap_start;
        char *heap_end;
-} ScanThreadDataJobData;
+} ScanThreadDataJob;
 
 static void
-job_scan_thread_data (WorkerData *worker_data, void *job_data_untyped)
+job_scan_thread_data (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
-       ScanThreadDataJobData *job_data = job_data_untyped;
+       WorkerData *worker_data = worker_data_untyped;
+       ScanThreadDataJob *job_data = (ScanThreadDataJob*)job;
 
        scan_thread_data (job_data->heap_start, job_data->heap_end, TRUE,
                        sgen_workers_get_job_gray_queue (worker_data));
-       sgen_free_internal_dynamic (job_data, sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA);
 }
 
+typedef struct {
+       SgenThreadPoolJob job;
+       FinalizeReadyEntry *list;
+} ScanFinalizerEntriesJob;
+
 static void
-job_scan_finalizer_entries (WorkerData *worker_data, void *job_data_untyped)
+job_scan_finalizer_entries (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
-       FinalizeReadyEntry *list = job_data_untyped;
+       WorkerData *worker_data = worker_data_untyped;
+       ScanFinalizerEntriesJob *job_data = (ScanFinalizerEntriesJob*)job;
        ScanCopyContext ctx = { NULL, current_object_ops.copy_or_mark_object, sgen_workers_get_job_gray_queue (worker_data) };
 
-       scan_finalizer_entries (list, ctx);
+       scan_finalizer_entries (job_data->list, ctx);
 }
 
 static void
-job_scan_major_mod_union_cardtable (WorkerData *worker_data, void *job_data_untyped)
+job_scan_major_mod_union_cardtable (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
+       WorkerData *worker_data = worker_data_untyped;
        g_assert (concurrent_collection_in_progress);
        major_collector.scan_card_table (TRUE, sgen_workers_get_job_gray_queue (worker_data));
 }
 
 static void
-job_scan_los_mod_union_cardtable (WorkerData *worker_data, void *job_data_untyped)
+job_scan_los_mod_union_cardtable (void *worker_data_untyped, SgenThreadPoolJob *job)
 {
+       WorkerData *worker_data = worker_data_untyped;
        g_assert (concurrent_collection_in_progress);
        sgen_los_scan_card_table (TRUE, sgen_workers_get_job_gray_queue (worker_data));
 }
@@ -2157,6 +2152,49 @@ init_gray_queue (void)
        sgen_gray_object_queue_init (&gray_queue, NULL);
 }
 
+static void
+enqueue_scan_from_roots_jobs (char *heap_start, char *heap_end)
+{
+       ScanFromRegisteredRootsJob *scrrj;
+       ScanThreadDataJob *stdj;
+       ScanFinalizerEntriesJob *sfej;
+
+       /* registered roots, this includes static fields */
+
+       scrrj = (ScanFromRegisteredRootsJob*)sgen_thread_pool_job_alloc ("scan from registered roots normal", job_scan_from_registered_roots, sizeof (ScanFromRegisteredRootsJob));
+       scrrj->copy_or_mark_func = current_object_ops.copy_or_mark_object;
+       scrrj->scan_func = current_object_ops.scan_object;
+       scrrj->heap_start = heap_start;
+       scrrj->heap_end = heap_end;
+       scrrj->root_type = ROOT_TYPE_NORMAL;
+       sgen_workers_enqueue_job (&scrrj->job);
+
+       scrrj = (ScanFromRegisteredRootsJob*)sgen_thread_pool_job_alloc ("scan from registered roots wbarrier", job_scan_from_registered_roots, sizeof (ScanFromRegisteredRootsJob));
+       scrrj->copy_or_mark_func = current_object_ops.copy_or_mark_object;
+       scrrj->scan_func = current_object_ops.scan_object;
+       scrrj->heap_start = heap_start;
+       scrrj->heap_end = heap_end;
+       scrrj->root_type = ROOT_TYPE_WBARRIER;
+       sgen_workers_enqueue_job (&scrrj->job);
+
+       /* Threads */
+
+       stdj = (ScanThreadDataJob*)sgen_thread_pool_job_alloc ("scan thread data", job_scan_thread_data, sizeof (ScanThreadDataJob));
+       stdj->heap_start = heap_start;
+       stdj->heap_end = heap_end;
+       sgen_workers_enqueue_job (&stdj->job);
+
+       /* Scan the list of objects ready for finalization. */
+
+       sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob));
+       sfej->list = fin_ready_list;
+       sgen_workers_enqueue_job (&sfej->job);
+
+       sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan critical finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob));
+       sfej->list = critical_fin_list;
+       sgen_workers_enqueue_job (&sfej->job);
+}
+
 /*
  * Perform a nursery collection.
  *
@@ -2168,8 +2206,6 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark)
        gboolean needs_major;
        size_t max_garbage_amount;
        char *nursery_next;
-       ScanFromRegisteredRootsJobData *scrrjd_normal, *scrrjd_wbarrier;
-       ScanThreadDataJobData *stdjd;
        mword fragment_total;
        ScanCopyContext ctx;
        TV_DECLARE (atv);
@@ -2193,6 +2229,8 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark)
        current_collection_generation = GENERATION_NURSERY;
        current_object_ops = sgen_minor_collector.serial_ops;
 
+       SGEN_ASSERT (0, !sgen_collection_is_concurrent (), "Why is the nursery collection concurrent?");
+
        reset_pinned_from_failed_allocation ();
 
        check_scan_starts ();
@@ -2269,7 +2307,7 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark)
         * as part of which we scan the card table.  Then, later, we scan the mod union
         * cardtable.  We should only have to do one.
         */
-       sgen_workers_enqueue_job ("scan remset", job_remembered_set_scan, NULL);
+       sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan remset", job_remembered_set_scan, sizeof (SgenThreadPoolJob)));
 
        /* we don't have complete write barrier yet, so we scan all the old generation sections */
        TV_GETTIME (btv);
@@ -2293,46 +2331,13 @@ collect_nursery (SgenGrayQueue *unpin_queue, gboolean finish_up_concurrent_mark)
 
        MONO_GC_CHECKPOINT_5 (GENERATION_NURSERY);
 
-       /* registered roots, this includes static fields */
-       scrrjd_normal = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       scrrjd_normal->copy_or_mark_func = current_object_ops.copy_or_mark_object;
-       scrrjd_normal->scan_func = current_object_ops.scan_object;
-       scrrjd_normal->heap_start = sgen_get_nursery_start ();
-       scrrjd_normal->heap_end = nursery_next;
-       scrrjd_normal->root_type = ROOT_TYPE_NORMAL;
-       sgen_workers_enqueue_job ("scan from registered roots normal", job_scan_from_registered_roots, scrrjd_normal);
-
-       scrrjd_wbarrier = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       scrrjd_wbarrier->copy_or_mark_func = current_object_ops.copy_or_mark_object;
-       scrrjd_wbarrier->scan_func = current_object_ops.scan_object;
-       scrrjd_wbarrier->heap_start = sgen_get_nursery_start ();
-       scrrjd_wbarrier->heap_end = nursery_next;
-       scrrjd_wbarrier->root_type = ROOT_TYPE_WBARRIER;
-       sgen_workers_enqueue_job ("scan from registered roots wbarrier", job_scan_from_registered_roots, scrrjd_wbarrier);
+       enqueue_scan_from_roots_jobs (sgen_get_nursery_start (), nursery_next);
 
        TV_GETTIME (btv);
-       time_minor_scan_registered_roots += TV_ELAPSED (atv, btv);
+       time_minor_scan_roots += TV_ELAPSED (atv, btv);
 
        MONO_GC_CHECKPOINT_6 (GENERATION_NURSERY);
-
-       /* thread data */
-       stdjd = sgen_alloc_internal_dynamic (sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       stdjd->heap_start = sgen_get_nursery_start ();
-       stdjd->heap_end = nursery_next;
-       sgen_workers_enqueue_job ("scan thread data", job_scan_thread_data, stdjd);
-
-       TV_GETTIME (atv);
-       time_minor_scan_thread_data += TV_ELAPSED (btv, atv);
-       btv = atv;
-
        MONO_GC_CHECKPOINT_7 (GENERATION_NURSERY);
-
-       g_assert (!sgen_collection_is_concurrent ());
-
-       /* Scan the list of objects ready for finalization. If */
-       sgen_workers_enqueue_job ("scan finalizer entries", job_scan_finalizer_entries, fin_ready_list);
-       sgen_workers_enqueue_job ("scan criticial finalizer entries", job_scan_finalizer_entries, critical_fin_list);
-
        MONO_GC_CHECKPOINT_8 (GENERATION_NURSERY);
 
        finish_gray_stack (GENERATION_NURSERY, &gray_queue);
@@ -2447,8 +2452,6 @@ major_copy_or_mark_from_roots (size_t *old_next_pin_slot, gboolean start_concurr
        char *heap_end = (char*)-1;
        gboolean profile_roots = mono_profiler_get_events () & MONO_PROFILE_GC_ROOTS;
        GCRootReport root_report = { 0 };
-       ScanFromRegisteredRootsJobData *scrrjd_normal, *scrrjd_wbarrier;
-       ScanThreadDataJobData *stdjd;
        ScanCopyContext ctx;
 
        if (concurrent_collection_in_progress) {
@@ -2632,59 +2635,24 @@ major_copy_or_mark_from_roots (size_t *old_next_pin_slot, gboolean start_concurr
        TV_GETTIME (atv);
        time_major_scan_pinned += TV_ELAPSED (btv, atv);
 
-       /* registered roots, this includes static fields */
-       scrrjd_normal = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       scrrjd_normal->copy_or_mark_func = current_object_ops.copy_or_mark_object;
-       scrrjd_normal->scan_func = current_object_ops.scan_object;
-       scrrjd_normal->heap_start = heap_start;
-       scrrjd_normal->heap_end = heap_end;
-       scrrjd_normal->root_type = ROOT_TYPE_NORMAL;
-       sgen_workers_enqueue_job ("scan from registered roots normal", job_scan_from_registered_roots, scrrjd_normal);
-
-       scrrjd_wbarrier = sgen_alloc_internal_dynamic (sizeof (ScanFromRegisteredRootsJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       scrrjd_wbarrier->copy_or_mark_func = current_object_ops.copy_or_mark_object;
-       scrrjd_wbarrier->scan_func = current_object_ops.scan_object;
-       scrrjd_wbarrier->heap_start = heap_start;
-       scrrjd_wbarrier->heap_end = heap_end;
-       scrrjd_wbarrier->root_type = ROOT_TYPE_WBARRIER;
-       sgen_workers_enqueue_job ("scan from registered roots wbarrier", job_scan_from_registered_roots, scrrjd_wbarrier);
-
-       TV_GETTIME (btv);
-       time_major_scan_registered_roots += TV_ELAPSED (atv, btv);
-
-       /* Threads */
-       stdjd = sgen_alloc_internal_dynamic (sizeof (ScanThreadDataJobData), INTERNAL_MEM_WORKER_JOB_DATA, TRUE);
-       stdjd->heap_start = heap_start;
-       stdjd->heap_end = heap_end;
-       sgen_workers_enqueue_job ("scan thread data", job_scan_thread_data, stdjd);
-
-       TV_GETTIME (atv);
-       time_major_scan_thread_data += TV_ELAPSED (btv, atv);
-
-       TV_GETTIME (btv);
-       time_major_scan_alloc_pinned += TV_ELAPSED (atv, btv);
-
        if (mono_profiler_get_events () & MONO_PROFILE_GC_ROOTS)
                report_finalizer_roots ();
 
-       /* scan the list of objects ready for finalization */
-       sgen_workers_enqueue_job ("scan finalizer entries", job_scan_finalizer_entries, fin_ready_list);
-       sgen_workers_enqueue_job ("scan critical finalizer entries", job_scan_finalizer_entries, critical_fin_list);
+       enqueue_scan_from_roots_jobs (heap_start, heap_end);
+
+       TV_GETTIME (btv);
+       time_major_scan_roots += TV_ELAPSED (atv, btv);
 
        if (scan_mod_union) {
                g_assert (finish_up_concurrent_mark);
 
                /* Mod union card table */
-               sgen_workers_enqueue_job ("scan mod union cardtable", job_scan_major_mod_union_cardtable, NULL);
-               sgen_workers_enqueue_job ("scan LOS mod union cardtable", job_scan_los_mod_union_cardtable, NULL);
-       }
-
-       TV_GETTIME (atv);
-       time_major_scan_finalized += TV_ELAPSED (btv, atv);
-       SGEN_LOG (2, "Root scan: %d usecs", TV_ELAPSED (btv, atv));
+               sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan mod union cardtable", job_scan_major_mod_union_cardtable, sizeof (SgenThreadPoolJob)));
+               sgen_workers_enqueue_job (sgen_thread_pool_job_alloc ("scan LOS mod union cardtable", job_scan_los_mod_union_cardtable, sizeof (SgenThreadPoolJob)));
 
-       TV_GETTIME (btv);
-       time_major_scan_big_objects += TV_ELAPSED (atv, btv);
+               TV_GETTIME (atv);
+               time_major_scan_mod_union += TV_ELAPSED (btv, atv);
+       }
 }
 
 static void
@@ -2750,13 +2718,6 @@ major_start_collection (gboolean concurrent, size_t *old_next_pin_slot)
        major_finish_copy_or_mark ();
 }
 
-static void
-wait_for_workers_to_finish (void)
-{
-       while (!sgen_workers_all_done ())
-               g_usleep (200);
-}
-
 static void
 major_finish_collection (const char *reason, size_t old_next_pin_slot, gboolean forced, gboolean scan_whole_nursery)
 {
@@ -3054,7 +3015,7 @@ major_finish_concurrent_collection (gboolean forced)
         * marking before the nursery collection is allowed to run, otherwise we might miss
         * some remsets.
         */
-       wait_for_workers_to_finish ();
+       sgen_workers_wait ();
 
        SGEN_TV_GETTIME (time_major_conc_collection_end);
        gc_stats.major_gc_time_concurrent += SGEN_TV_ELAPSED (time_major_conc_collection_start, time_major_conc_collection_end);
@@ -4856,9 +4817,6 @@ mono_gc_base_init (void)
                g_strfreev (opts);
        }
 
-       if (major_collector.is_concurrent)
-               sgen_workers_init (1);
-
        if (major_collector_opt)
                g_free (major_collector_opt);
 
@@ -5030,6 +4988,9 @@ mono_gc_base_init (void)
        if (major_collector.post_param_init)
                major_collector.post_param_init (&major_collector);
 
+       if (major_collector.needs_thread_pool)
+               sgen_workers_init (1);
+
        sgen_memgov_init (max_heap, soft_limit, debug_print_allowance, allowance_ratio, save_target);
 
        memset (&remset, 0, sizeof (remset));
index 229162c0d69f1deb023a6518be8e6cd304fc36a2..fe989be2a0b50c7c41bd81e9e6bbc6fff33a90af 100644 (file)
@@ -392,8 +392,6 @@ gboolean sgen_resume_thread (SgenThreadInfo *info);
 void sgen_wait_for_suspend_ack (int count);
 void sgen_os_init (void);
 
-gboolean sgen_is_worker_thread (MonoNativeThreadId thread);
-
 void sgen_update_heap_boundaries (mword low, mword high);
 
 void sgen_scan_area_with_callback (char *start, char *end, IterateObjectCallbackFunc callback, void *data, gboolean allow_flags);
@@ -421,7 +419,7 @@ enum {
        INTERNAL_MEM_MS_BLOCK_INFO_SORT,
        INTERNAL_MEM_EPHEMERON_LINK,
        INTERNAL_MEM_WORKER_DATA,
-       INTERNAL_MEM_WORKER_JOB_DATA,
+       INTERNAL_MEM_THREAD_POOL_JOB,
        INTERNAL_MEM_BRIDGE_DATA,
        INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE,
        INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE_ENTRY,
@@ -433,7 +431,6 @@ enum {
        INTERNAL_MEM_TARJAN_BRIDGE_HASH_TABLE_ENTRY,
        INTERNAL_MEM_TARJAN_OBJ_BUCKET,
        INTERNAL_MEM_BRIDGE_DEBUG,
-       INTERNAL_MEM_JOB_QUEUE_ENTRY,
        INTERNAL_MEM_TOGGLEREF_DATA,
        INTERNAL_MEM_CARDTABLE_MOD_UNION,
        INTERNAL_MEM_BINARY_PROTOCOL,
@@ -613,12 +610,13 @@ void sgen_split_nursery_init (SgenMinorCollector *collector);
 /* Updating references */
 
 #ifdef SGEN_CHECK_UPDATE_REFERENCE
+gboolean sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread) MONO_INTERNAL;
 static inline void
 sgen_update_reference (void **p, void *o, gboolean allow_null)
 {
        if (!allow_null)
                SGEN_ASSERT (0, o, "Cannot update a reference with a NULL pointer");
-       SGEN_ASSERT (0, !sgen_is_worker_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread");
+       SGEN_ASSERT (0, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread");
        *p = o;
 }
 
@@ -654,6 +652,7 @@ typedef struct _SgenMajorCollector SgenMajorCollector;
 struct _SgenMajorCollector {
        size_t section_size;
        gboolean is_concurrent;
+       gboolean needs_thread_pool;
        gboolean supports_cardtable;
        gboolean sweeps_lazily;
 
@@ -707,7 +706,6 @@ struct _SgenMajorCollector {
        size_t (*get_bytes_survived_last_sweep) (void);
        gboolean (*handle_gc_param) (const char *opt);
        void (*print_gc_param_usage) (void);
-       gboolean (*is_worker_thread) (MonoNativeThreadId thread);
        void (*post_param_init) (SgenMajorCollector *collector);
        void* (*alloc_worker_data) (void);
        void (*init_worker_thread) (void *data);
index f5b8995355fe58edeb341cb656fd8c0763ade96b..dc484cc2a10bbc85204b75ee62368466d03360bc 100644 (file)
@@ -134,7 +134,7 @@ description_for_type (int type)
        case INTERNAL_MEM_MS_BLOCK_INFO_SORT: return "marksweep-block-info-sort";
        case INTERNAL_MEM_EPHEMERON_LINK: return "ephemeron-link";
        case INTERNAL_MEM_WORKER_DATA: return "worker-data";
-       case INTERNAL_MEM_WORKER_JOB_DATA: return "worker-job-data";
+       case INTERNAL_MEM_THREAD_POOL_JOB: return "thread-pool-job";
        case INTERNAL_MEM_BRIDGE_DATA: return "bridge-data";
        case INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE: return "old-bridge-hash-table";
        case INTERNAL_MEM_OLD_BRIDGE_HASH_TABLE_ENTRY: return "old-bridge-hash-table-entry";
@@ -146,7 +146,6 @@ description_for_type (int type)
        case INTERNAL_MEM_BRIDGE_ALIVE_HASH_TABLE: return "bridge-alive-hash-table";
        case INTERNAL_MEM_BRIDGE_ALIVE_HASH_TABLE_ENTRY: return "bridge-alive-hash-table-entry";
        case INTERNAL_MEM_BRIDGE_DEBUG: return "bridge-debug";
-       case INTERNAL_MEM_JOB_QUEUE_ENTRY: return "job-queue-entry";
        case INTERNAL_MEM_TOGGLEREF_DATA: return "toggleref-data";
        case INTERNAL_MEM_CARDTABLE_MOD_UNION: return "cardtable-mod-union";
        case INTERNAL_MEM_BINARY_PROTOCOL: return "binary-protocol";
index 3688b46aadc96c3f60c7d6aad1717768751bc1a3..d35f12b58762e4322798c2a956f33257b7dc3db7 100644 (file)
@@ -804,12 +804,13 @@ set_sweep_state (int new, int expected)
 
 static gboolean ensure_block_is_checked_for_sweeping (int block_index, gboolean wait, gboolean *have_checked);
 
-static SgenThreadPoolJob sweep_job;
+static SgenThreadPoolJob * volatile sweep_job;
 
 static void
 major_finish_sweep_checking (void)
 {
        int block_index;
+       SgenThreadPoolJob *job;
 
  retry:
        switch (sweep_state) {
@@ -841,7 +842,10 @@ major_finish_sweep_checking (void)
        set_sweep_state (SWEEP_STATE_SWEEPING, SWEEP_STATE_SWEEPING_AND_ITERATING);
 
  wait:
-       sgen_thread_pool_job_wait (&sweep_job);
+       job = sweep_job;
+       if (job)
+               sgen_thread_pool_job_wait (job);
+       SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?");
        SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?");
 }
 
@@ -1061,7 +1065,7 @@ static void
 major_copy_or_mark_object_with_evacuation_concurrent (void **ptr, void *obj, SgenGrayQueue *queue)
 {
        SGEN_ASSERT (9, sgen_concurrent_collection_in_progress (), "Why are we scanning concurrently when there's no concurrent collection on?");
-       SGEN_ASSERT (9, !sgen_workers_are_working () || sgen_is_worker_thread (mono_native_thread_id_get ()), "We must not scan from two threads at the same time!");
+       SGEN_ASSERT (9, !sgen_workers_are_working () || sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "We must not scan from two threads at the same time!");
 
        g_assert (!SGEN_OBJECT_IS_FORWARDED (obj));
 
@@ -1523,7 +1527,7 @@ ensure_block_is_checked_for_sweeping (int block_index, gboolean wait, gboolean *
 }
 
 static void
-sweep_job_func (SgenThreadPoolJob *job)
+sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job)
 {
        int block_index;
        int num_blocks = num_major_sections_before_sweep;
@@ -1566,6 +1570,8 @@ sweep_job_func (SgenThreadPoolJob *job)
        sgen_pointer_queue_remove_nulls (&allocated_blocks);
 
        sweep_finish ();
+
+       sweep_job = NULL;
 }
 
 static void
@@ -1611,11 +1617,12 @@ major_sweep (void)
        num_major_sections_before_sweep = num_major_sections;
        num_major_sections_freed_in_sweep = 0;
 
+       SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?");
        if (concurrent_sweep) {
-               sgen_thread_pool_job_init (&sweep_job, sweep_job_func);
-               sgen_thread_pool_job_enqueue (&sweep_job);
+               sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob));
+               sgen_thread_pool_job_enqueue (sweep_job);
        } else {
-               sweep_job_func (NULL);
+               sweep_job_func (NULL, NULL);
        }
 }
 
@@ -2339,6 +2346,7 @@ static void
 post_param_init (SgenMajorCollector *collector)
 {
        collector->sweeps_lazily = lazy_sweep;
+       collector->needs_thread_pool = concurrent_mark || concurrent_sweep;
 }
 
 static void
@@ -2391,13 +2399,12 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        collector->section_size = MAJOR_SECTION_SIZE;
 
        concurrent_mark = is_concurrent;
-       if (is_concurrent) {
-               collector->is_concurrent = TRUE;
+       collector->is_concurrent = is_concurrent;
+       collector->needs_thread_pool = is_concurrent || concurrent_sweep;
+       if (is_concurrent)
                collector->want_synchronous_collection = &want_evacuation;
-       } else {
-               collector->is_concurrent = FALSE;
+       else
                collector->want_synchronous_collection = NULL;
-       }
        collector->get_and_reset_num_major_objects_marked = major_get_and_reset_num_major_objects_marked;
        collector->supports_cardtable = TRUE;
 
@@ -2476,9 +2483,6 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        mono_mutex_init (&scanned_objects_list_lock);
 #endif
 
-       if (concurrent_sweep)
-               sgen_thread_pool_init (1);
-
        SGEN_ASSERT (0, SGEN_MAX_SMALL_OBJ_SIZE <= MS_BLOCK_FREE / 2, "MAX_SMALL_OBJ_SIZE must be at most MS_BLOCK_FREE / 2");
 
        /*cardtable requires major pages to be 8 cards aligned*/
index bf8831268e4b958d13a573b0cbba41bd7c0c2733..42231e4fcae6f7eb18bde13cbd8c77d4d4fd3289 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "metadata/sgen-gc.h"
 #include "metadata/sgen-memory-governor.h"
+#include "metadata/sgen-thread-pool.h"
 #include "metadata/mono-gc.h"
 
 #include "utils/mono-counters.h"
@@ -335,7 +336,7 @@ gboolean
 sgen_memgov_try_alloc_space (mword size, int space)
 {
        if (sgen_memgov_available_free_space () < size) {
-               SGEN_ASSERT (4, !sgen_is_worker_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread");
+               SGEN_ASSERT (4, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread");
                return FALSE;
        }
 
index 7d599e20908e59e6559b0e1bafbaa41e01e76f4f..403d04c3d9a56f95560c27f08f8b55e2d0a0bdd9 100644 (file)
@@ -31,6 +31,7 @@
 #include "metadata/sgen-gc.h"
 #include "metadata/sgen-archdep.h"
 #include "metadata/sgen-protocol.h"
+#include "metadata/sgen-thread-pool.h"
 #include "metadata/object-internals.h"
 #include "metadata/gc-internal.h"
 
@@ -116,7 +117,7 @@ sgen_thread_handshake (BOOL suspend)
 
        cur_thread->suspend_done = TRUE;
        FOREACH_THREAD_SAFE (info) {
-               if (info == cur_thread || sgen_is_worker_thread (mono_thread_info_get_tid (info)))
+               if (info == cur_thread || sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info)))
                        continue;
 
                info->suspend_done = FALSE;
index 22f70c3041784664a3658c7ba4b4a282b7c67c70..bbabb8ee36d78d9a48b9b897173eec52003ace2e 100644 (file)
@@ -27,6 +27,7 @@
 #include "sgen-gc.h"
 #include "sgen-protocol.h"
 #include "sgen-memory-governor.h"
+#include "sgen-thread-pool.h"
 #include "utils/mono-mmap.h"
 #include "utils/mono-threads.h"
 
@@ -293,7 +294,7 @@ protocol_entry (unsigned char type, gpointer data, int size)
        if (binary_protocol_file == -1)
                return;
 
-       if (sgen_is_worker_thread (mono_native_thread_id_get ()))
+       if (sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()))
                type |= 0x80;
 
        lock_recursive ();
index 700241621ba547457048ff675c7f9cd4df3c8526..9078f6bb07a1256f3c7c4d7810944b67b2ce7c5d 100644 (file)
@@ -30,6 +30,7 @@
 #include "metadata/sgen-gc.h"
 #include "metadata/sgen-protocol.h"
 #include "metadata/sgen-memory-governor.h"
+#include "metadata/sgen-thread-pool.h"
 #include "metadata/profiler-private.h"
 #include "utils/mono-time.h"
 #include "utils/dtrace.h"
@@ -397,7 +398,7 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info)
        We can't suspend the workers that will do all the heavy lifting.
        FIXME Use some state bit in SgenThreadInfo for this.
        */
-       if (sgen_is_worker_thread (mono_thread_info_get_tid (info))) {
+       if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) {
                return FALSE;
        }
 
index 81437d5f937035925577911e05d494ae2bba6593..71d8d70742ae68fed07a847b2e298101adbd1b5a 100644 (file)
@@ -34,6 +34,10 @@ static MonoNativeThreadId thread;
 
 /* Only accessed with the lock held. */
 static SgenPointerQueue job_queue;
+static volatile gboolean idle_working;
+
+static SgenThreadPoolThreadInitFunc thread_init_func;
+static SgenThreadPoolIdleJobFunc idle_job_func;
 
 enum {
        STATE_WAITING,
@@ -43,64 +47,93 @@ enum {
 
 /* Assumes that the lock is held. */
 static SgenThreadPoolJob*
-get_job (void)
+get_job_and_set_in_progress (void)
 {
        for (size_t i = 0; i < job_queue.next_slot; ++i) {
                SgenThreadPoolJob *job = job_queue.data [i];
-               if (job->state == STATE_WAITING)
+               if (job->state == STATE_WAITING) {
+                       job->state = STATE_IN_PROGRESS;
                        return job;
+               }
        }
        return NULL;
 }
 
+/* Assumes that the lock is held. */
+static ssize_t
+find_job_in_queue (SgenThreadPoolJob *job)
+{
+       for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
+               if (job_queue.data [i] == job)
+                       return i;
+       }
+       return -1;
+}
+
 /* Assumes that the lock is held. */
 static void
 remove_job (SgenThreadPoolJob *job)
 {
-       gboolean found = FALSE;
+       ssize_t index;
        SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
-       for (size_t i = 0; i < job_queue.next_slot; ++i) {
-               if (job_queue.data [i] == job) {
-                       job_queue.data [i] = NULL;
-                       found = TRUE;
-                       break;
-               }
-       }
-       SGEN_ASSERT (0, found, "Why is the job we're trying to remove not in the queue?");
+       index = find_job_in_queue (job);
+       SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
+       job_queue.data [index] = NULL;
        sgen_pointer_queue_remove_nulls (&job_queue);
+       sgen_thread_pool_job_free (job);
 }
 
 static mono_native_thread_return_t
-thread_func (void *arg)
+thread_func (void *thread_data)
 {
-       mono_thread_info_register_small_id ();
+       thread_init_func (thread_data);
 
        mono_mutex_lock (&lock);
        for (;;) {
                SgenThreadPoolJob *job;
+               gboolean do_idle = idle_working;
 
-               while (!(job = get_job ()))
+               job = get_job_and_set_in_progress ();
+               if (!job && !do_idle) {
                        mono_cond_wait (&work_cond, &lock);
-               SGEN_ASSERT (0, job->state == STATE_WAITING, "The job we got is in the wrong state.  Should be waiting.");
-               job->state = STATE_IN_PROGRESS;
+                       do_idle = idle_working;
+                       job = get_job_and_set_in_progress ();
+               }
+
                mono_mutex_unlock (&lock);
 
-               job->func (job);
-
-               mono_mutex_lock (&lock);
-               SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
-               job->state = STATE_DONE;
-               remove_job (job);
-               /*
-                * Only the main GC thread will ever wait on the done condition, so we don't
-                * have to broadcast.
-                */
-               mono_cond_signal (&done_cond);
+               if (job) {
+                       job->func (thread_data, job);
+
+                       mono_mutex_lock (&lock);
+
+                       SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
+                       job->state = STATE_DONE;
+                       remove_job (job);
+                       /*
+                        * Only the main GC thread will ever wait on the done condition, so we don't
+                        * have to broadcast.
+                        */
+                       mono_cond_signal (&done_cond);
+               } else {
+                       SGEN_ASSERT (0, do_idle, "Why did we unlock if we still have to wait for idle?");
+                       SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
+                       do {
+                               do_idle = idle_job_func (thread_data);
+                       } while (do_idle && !job_queue.next_slot);
+
+                       mono_mutex_lock (&lock);
+
+                       if (!do_idle) {
+                               idle_working = FALSE;
+                               mono_cond_signal (&done_cond);
+                       }
+               }
        }
 }
 
 void
-sgen_thread_pool_init (int num_threads)
+sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, void **thread_datas)
 {
        SGEN_ASSERT (0, num_threads == 1, "We only support 1 thread pool thread for now.");
 
@@ -108,14 +141,28 @@ sgen_thread_pool_init (int num_threads)
        mono_cond_init (&work_cond, NULL);
        mono_cond_init (&done_cond, NULL);
 
-       mono_native_thread_create (&thread, thread_func, NULL);
+       thread_init_func = init_func;
+       idle_job_func = idle_func;
+       idle_working = idle_func != NULL;
+
+       mono_native_thread_create (&thread, thread_func, thread_datas ? thread_datas [0] : NULL);
 }
 
-void
-sgen_thread_pool_job_init (SgenThreadPoolJob *job, SgenThreadPoolJobFunc func)
+SgenThreadPoolJob*
+sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
 {
+       SgenThreadPoolJob *job = sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
+       job->name = name;
+       job->size = size;
        job->state = STATE_WAITING;
        job->func = func;
+       return job;
+}
+
+void
+sgen_thread_pool_job_free (SgenThreadPoolJob *job)
+{
+       sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
 }
 
 void
@@ -136,9 +183,43 @@ sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
 void
 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
 {
+       SGEN_ASSERT (0, job, "Where's the job?");
+
        mono_mutex_lock (&lock);
 
-       while (job->state != STATE_DONE)
+       while (find_job_in_queue (job) >= 0)
+               mono_cond_wait (&done_cond, &lock);
+
+       mono_mutex_unlock (&lock);
+}
+
+void
+sgen_thread_pool_idle_signal (void)
+{
+       SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
+
+       if (idle_working)
+               return;
+
+       mono_mutex_lock (&lock);
+
+       idle_working = TRUE;
+       mono_cond_signal (&work_cond);
+
+       mono_mutex_unlock (&lock);
+}
+
+void
+sgen_thread_pool_idle_wait (void)
+{
+       SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
+
+       if (!idle_working)
+               return;
+
+       mono_mutex_lock (&lock);
+
+       while (idle_working)
                mono_cond_wait (&done_cond, &lock);
 
        mono_mutex_unlock (&lock);
@@ -155,4 +236,10 @@ sgen_thread_pool_wait_for_all_jobs (void)
        mono_mutex_unlock (&lock);
 }
 
+gboolean
+sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
+{
+       return some_thread == thread;
+}
+
 #endif
index 3ea4c5a6d03e81da696a640b8dd98297835bad30..e0673f04a7b6ec86e0fbb732189fe906428b2a49 100644 (file)
 
 typedef struct _SgenThreadPoolJob SgenThreadPoolJob;
 
-typedef void (*SgenThreadPoolJobFunc) (SgenThreadPoolJob *job);
+typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job);
 
 struct _SgenThreadPoolJob {
+       const char *name;
        SgenThreadPoolJobFunc func;
+       size_t size;
        volatile gint32 state;
 };
 
-void sgen_thread_pool_init (int num_threads);
+typedef void (*SgenThreadPoolThreadInitFunc) (void*);
+typedef gboolean (*SgenThreadPoolIdleJobFunc) (void*);
+
+void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, void **thread_datas);
+
+SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size);
+/* This only needs to be called on jobs that are not enqueued. */
+void sgen_thread_pool_job_free (SgenThreadPoolJob *job);
 
-void sgen_thread_pool_job_init (SgenThreadPoolJob *job, SgenThreadPoolJobFunc func);
 void sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job);
+/* This must only be called after the job has been enqueued. */
 void sgen_thread_pool_job_wait (SgenThreadPoolJob *job);
 
+void sgen_thread_pool_idle_signal (void);
+void sgen_thread_pool_idle_wait (void);
+
 void sgen_thread_pool_wait_for_all_jobs (void);
 
+gboolean sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread);
+
 #endif
index a262207c075193496371225497447d7d93876b30..f54362656101a765c0a40309c9b90e301846d95e 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "metadata/sgen-gc.h"
 #include "metadata/sgen-workers.h"
+#include "metadata/sgen-thread-pool.h"
 #include "utils/mono-counters.h"
 
 static int workers_num;
@@ -33,50 +34,22 @@ static void *workers_gc_thread_major_collector_data = NULL;
 static SgenSectionGrayQueue workers_distribute_gray_queue;
 static gboolean workers_distribute_gray_queue_inited;
 
-static gboolean workers_started = FALSE;
-
 enum {
        STATE_NOT_WORKING,
        STATE_WORKING,
        STATE_NURSERY_COLLECTION
 } WorkersStateName;
 
-/*
- * | state                    | num_awake | num_posted                 | post_done |
- * |--------------------------+-----------+----------------------------+-----------|
- * | STATE_NOT_WORKING        | 0         | 0                          |         0 |
- * | STATE_WORKING            | > 0       | <= workers_num - num_awake |         * |
- * | STATE_NURSERY_COLLECTION | *         | <= workers_num - num_awake |         1 |
- * | STATE_NURSERY_COLLECTION | 0         | 0                          |         0 |
- */
 typedef union {
        gint32 value;
        struct {
                guint state : 4; /* WorkersStateName */
-               /* Number of worker threads awake. */
-               guint num_awake : 8;
-               /* The state of the waiting semaphore. */
-               guint num_posted : 8;
-               /* Whether to post `workers_done_sem` */
-               guint post_done : 1;
        } data;
 } State;
 
 static volatile State workers_state;
 
-static MonoSemType workers_waiting_sem;
-static MonoSemType workers_done_sem;
-
-static volatile int workers_job_queue_num_entries = 0;
-static volatile JobQueueEntry *workers_job_queue = NULL;
-static LOCK_DECLARE (workers_job_queue_mutex);
-static int workers_num_jobs_enqueued = 0;
-static volatile int workers_num_jobs_finished = 0;
-
-static guint64 stat_workers_stolen_from_self_lock;
-static guint64 stat_workers_stolen_from_self_no_lock;
-static guint64 stat_workers_stolen_from_others;
-static guint64 stat_workers_num_waited;
+static guint64 stat_workers_num_finished;
 
 static gboolean
 set_state (State old_state, State new_state)
@@ -92,9 +65,6 @@ static void
 assert_not_working (State state)
 {
        SGEN_ASSERT (0, state.data.state == STATE_NOT_WORKING, "Can only signal enqueue work when in no work state");
-       SGEN_ASSERT (0, state.data.num_awake == 0, "No workers can be awake when not working");
-       SGEN_ASSERT (0, state.data.num_posted == 0, "Can't have posted already");
-       SGEN_ASSERT (0, !state.data.post_done, "post_done can only be set when working");
 
 }
 
@@ -102,26 +72,12 @@ static void
 assert_working (State state, gboolean from_worker)
 {
        SGEN_ASSERT (0, state.data.state == STATE_WORKING, "A worker can't wait without being in working state");
-       if (from_worker)
-               SGEN_ASSERT (0, state.data.num_awake > 0, "How can we be awake, yet we are not counted?");
-       else
-               SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted > 0, "How can we be working, yet no worker threads are awake or to be awoken?");
-       SGEN_ASSERT (0, state.data.num_awake + state.data.num_posted <= workers_num, "There are too many worker threads awake");
 }
 
 static void
 assert_nursery_collection (State state, gboolean from_worker)
 {
        SGEN_ASSERT (0, state.data.state == STATE_NURSERY_COLLECTION, "Must be in the nursery collection state");
-       if (from_worker) {
-               SGEN_ASSERT (0, state.data.num_awake > 0, "We're awake, but num_awake is zero");
-               SGEN_ASSERT (0, state.data.post_done, "post_done must be set in the nursery collection state");
-       }
-       SGEN_ASSERT (0, state.data.num_awake <= workers_num, "There are too many worker threads awake");
-       if (!state.data.post_done) {
-               SGEN_ASSERT (0, state.data.num_awake == 0, "Once done has been posted no threads can be awake");
-               SGEN_ASSERT (0, state.data.num_posted == 0, "Once done has been posted no thread must be awoken");
-       }
 }
 
 static void
@@ -134,85 +90,54 @@ assert_working_or_nursery_collection (State state)
 }
 
 static void
-workers_signal_enqueue_work (int num_wake_up, gboolean from_nursery_collection)
+workers_signal_enqueue_work (gboolean from_nursery_collection)
 {
        State old_state = workers_state;
        State new_state = old_state;
-       int i;
        gboolean did_set_state;
 
-       SGEN_ASSERT (0, num_wake_up <= workers_num, "Cannot wake up more workers than are present");
-
        if (from_nursery_collection)
                assert_nursery_collection (old_state, FALSE);
        else
                assert_not_working (old_state);
 
        new_state.data.state = STATE_WORKING;
-       new_state.data.num_posted = num_wake_up;
 
        did_set_state = set_state (old_state, new_state);
        SGEN_ASSERT (0, did_set_state, "Nobody else should be mutating the state");
 
-       for (i = 0; i < num_wake_up; ++i)
-               MONO_SEM_POST (&workers_waiting_sem);
+       sgen_thread_pool_idle_signal ();
 }
 
 static void
-workers_signal_enqueue_work_if_necessary (int num_wake_up)
+workers_signal_enqueue_work_if_necessary (void)
 {
        if (workers_state.data.state == STATE_NOT_WORKING)
-               workers_signal_enqueue_work (num_wake_up, FALSE);
+               workers_signal_enqueue_work (FALSE);
 }
 
 void
 sgen_workers_ensure_awake (void)
 {
        SGEN_ASSERT (0, workers_state.data.state != STATE_NURSERY_COLLECTION, "Can't wake workers during nursery collection");
-       workers_signal_enqueue_work_if_necessary (workers_num);
+       workers_signal_enqueue_work_if_necessary ();
 }
 
 static void
-workers_wait (void)
+worker_finish (void)
 {
        State old_state, new_state;
-       gboolean post_done;
 
-       ++stat_workers_num_waited;
+       ++stat_workers_num_finished;
 
        do {
                new_state = old_state = workers_state;
 
                assert_working_or_nursery_collection (old_state);
 
-               --new_state.data.num_awake;
-               post_done = FALSE;
-               if (!new_state.data.num_awake && !new_state.data.num_posted) {
-                       /* We are the last thread to go to sleep. */
-                       if (old_state.data.state == STATE_WORKING)
-                               new_state.data.state = STATE_NOT_WORKING;
-
-                       new_state.data.post_done = 0;
-                       if (old_state.data.post_done)
-                               post_done = TRUE;
-               }
-       } while (!set_state (old_state, new_state));
-
-       if (post_done)
-               MONO_SEM_POST (&workers_done_sem);
-
-       MONO_SEM_WAIT (&workers_waiting_sem);
-
-       do {
-               new_state = old_state = workers_state;
-
-               SGEN_ASSERT (0, old_state.data.num_posted > 0, "How can we be awake without the semaphore having been posted?");
-               SGEN_ASSERT (0, old_state.data.num_awake < workers_num, "There are too many worker threads awake");
-
-               --new_state.data.num_posted;
-               ++new_state.data.num_awake;
-
-               assert_working_or_nursery_collection (new_state);
+               /* We are the last thread to go to sleep. */
+               if (old_state.data.state == STATE_WORKING)
+                       new_state.data.state = STATE_NOT_WORKING;
        } while (!set_state (old_state, new_state));
 }
 
@@ -223,41 +148,21 @@ collection_needs_workers (void)
 }
 
 void
-sgen_workers_enqueue_job (const char *name, JobFunc func, void *data)
+sgen_workers_enqueue_job (SgenThreadPoolJob *job)
 {
-       int num_entries;
-       JobQueueEntry *entry;
-
        if (!collection_needs_workers ()) {
-               func (NULL, data);
+               job->func (NULL, job);
+               sgen_thread_pool_job_free (job);
                return;
        }
 
-       entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
-       entry->name = name;
-       entry->func = func;
-       entry->data = data;
-
-       mono_mutex_lock (&workers_job_queue_mutex);
-       entry->next = workers_job_queue;
-       workers_job_queue = entry;
-       num_entries = ++workers_job_queue_num_entries;
-       ++workers_num_jobs_enqueued;
-       mono_mutex_unlock (&workers_job_queue_mutex);
-
-       if (workers_state.data.state != STATE_NURSERY_COLLECTION)
-               workers_signal_enqueue_work_if_necessary (num_entries < workers_num ? num_entries : workers_num);
+       sgen_thread_pool_job_enqueue (job);
 }
 
 void
 sgen_workers_wait_for_jobs_finished (void)
 {
-       // FIXME: implement this properly
-       while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
-               workers_signal_enqueue_work_if_necessary (workers_num);
-               /* FIXME: sleep less? */
-               g_usleep (1000);
-       }
+       sgen_thread_pool_wait_for_all_jobs ();
 }
 
 void
@@ -274,66 +179,20 @@ sgen_workers_signal_start_nursery_collection_and_wait (void)
                        assert_not_working (old_state);
                } else {
                        assert_working (old_state, FALSE);
-                       SGEN_ASSERT (0, !old_state.data.post_done, "We are not waiting for the workers");
-
-                       new_state.data.post_done = 1;
                }
        } while (!set_state (old_state, new_state));
 
-       if (new_state.data.post_done)
-               MONO_SEM_WAIT (&workers_done_sem);
+       sgen_thread_pool_idle_wait ();
 
        old_state = workers_state;
        assert_nursery_collection (old_state, FALSE);
-       SGEN_ASSERT (0, !old_state.data.post_done, "We got the semaphore, so it must have been posted");
 }
 
 void
 sgen_workers_signal_finish_nursery_collection (void)
 {
-       State old_state = workers_state;
-
-       assert_nursery_collection (old_state, FALSE);
-       SGEN_ASSERT (0, !old_state.data.post_done, "We are finishing the nursery collection, so we should have waited for the semaphore earlier");
-
-       workers_signal_enqueue_work (workers_num, TRUE);
-}
-
-static gboolean
-workers_dequeue_and_do_job (WorkerData *data)
-{
-       JobQueueEntry *entry;
-
-       /*
-        * At this point the GC might not be running anymore.  We
-        * could have been woken up by a job that was then taken by
-        * another thread, after which the collection finished, so we
-        * first have to successfully dequeue a job before doing
-        * anything assuming that the collection is still ongoing.
-        */
-
-       if (!workers_job_queue_num_entries)
-               return FALSE;
-
-       mono_mutex_lock (&workers_job_queue_mutex);
-       entry = (JobQueueEntry*)workers_job_queue;
-       if (entry) {
-               workers_job_queue = entry->next;
-               --workers_job_queue_num_entries;
-       }
-       mono_mutex_unlock (&workers_job_queue_mutex);
-
-       if (!entry)
-               return FALSE;
-
-       g_assert (collection_needs_workers ());
-
-       entry->func (data, entry->data);
-       sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
-
-       SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
-
-       return TRUE;
+       assert_nursery_collection (workers_state, FALSE);
+       workers_signal_enqueue_work (TRUE);
 }
 
 static gboolean
@@ -373,66 +232,62 @@ init_private_gray_queue (WorkerData *data)
                        sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
 }
 
-static mono_native_thread_return_t
-workers_thread_func (void *data_untyped)
+static void
+thread_pool_init_func (void *data_untyped)
 {
        WorkerData *data = data_untyped;
        SgenMajorCollector *major = sgen_get_major_collector ();
 
        mono_thread_info_register_small_id ();
 
+       if (!major->is_concurrent)
+               return;
+
        if (major->init_worker_thread)
                major->init_worker_thread (data->major_collector_data);
 
        init_private_gray_queue (data);
+}
 
-       for (;;) {
-               gboolean did_work = FALSE;
-
-               SGEN_ASSERT (0, sgen_get_current_collection_generation () != GENERATION_NURSERY, "Why are we doing work while there's a nursery collection happening?");
-
-               while (workers_state.data.state == STATE_WORKING && workers_dequeue_and_do_job (data)) {
-                       did_work = TRUE;
-                       /* FIXME: maybe distribute the gray queue here? */
-               }
+static gboolean
+marker_idle_func (void *data_untyped)
+{
+       WorkerData *data = data_untyped;
+       SgenMajorCollector *major = sgen_get_major_collector ();
 
-               if (!did_work && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
-                       SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
-                               ? &major->major_concurrent_ops
-                               : &major->major_ops;
-                       ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
+       if (workers_state.data.state != STATE_WORKING)
+               return FALSE;
 
-                       g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
+       SGEN_ASSERT (0, sgen_get_current_collection_generation () != GENERATION_NURSERY, "Why are we doing work while there's a nursery collection happening?");
 
-                       while (!sgen_drain_gray_stack (32, ctx)) {
-                               if (workers_state.data.state == STATE_NURSERY_COLLECTION)
-                                       workers_wait ();
-                       }
-                       g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
+       if (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data)) {
+               SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
+                       ? &major->major_concurrent_ops
+                       : &major->major_ops;
+               ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
 
-                       init_private_gray_queue (data);
+               SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
 
-                       did_work = TRUE;
-               }
+               sgen_drain_gray_stack (32, ctx);
 
-               if (!did_work)
-                       workers_wait ();
+               return TRUE;
        }
 
-       /* dummy return to make compilers happy */
-       return NULL;
+       worker_finish ();
+
+       return FALSE;
 }
 
 static void
-init_distribute_gray_queue (gboolean locked)
+init_distribute_gray_queue (void)
 {
        if (workers_distribute_gray_queue_inited) {
                g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
-               g_assert (!workers_distribute_gray_queue.locked == !locked);
+               g_assert (workers_distribute_gray_queue.locked);
                return;
        }
 
-       sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
+       sgen_section_gray_queue_init (&workers_distribute_gray_queue, TRUE,
                        sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
        workers_distribute_gray_queue_inited = TRUE;
 }
@@ -440,19 +295,21 @@ init_distribute_gray_queue (gboolean locked)
 void
 sgen_workers_init_distribute_gray_queue (void)
 {
-       if (!collection_needs_workers ())
-               return;
-
-       init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
+       SGEN_ASSERT (0, sgen_get_major_collector ()->is_concurrent && collection_needs_workers (),
+                       "Why should we init the distribute gray queue if we don't need it?");
+       init_distribute_gray_queue ();
 }
 
 void
 sgen_workers_init (int num_workers)
 {
        int i;
+       void *workers_data_ptrs [num_workers];
 
-       if (!sgen_get_major_collector ()->is_concurrent)
+       if (!sgen_get_major_collector ()->is_concurrent) {
+               sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL);
                return;
+       }
 
        //g_print ("initing %d workers\n", num_workers);
 
@@ -461,10 +318,7 @@ sgen_workers_init (int num_workers)
        workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
        memset (workers_data, 0, sizeof (WorkerData) * num_workers);
 
-       MONO_SEM_INIT (&workers_waiting_sem, 0);
-       MONO_SEM_INIT (&workers_done_sem, 0);
-
-       init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent);
+       init_distribute_gray_queue ();
 
        if (sgen_get_major_collector ()->alloc_worker_data)
                workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
@@ -474,98 +328,41 @@ sgen_workers_init (int num_workers)
 
                if (sgen_get_major_collector ()->alloc_worker_data)
                        workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
-       }
-
-       LOCK_INIT (workers_job_queue_mutex);
 
-       sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
-
-       mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_lock);
-       mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_self_no_lock);
-       mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_stolen_from_others);
-       mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_waited);
-}
+               workers_data_ptrs [i] = &workers_data [i];
+       }
 
-/* only the GC thread is allowed to start and join workers */
+       sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, workers_data_ptrs);
 
-static void
-workers_start_worker (int index)
-{
-       g_assert (index >= 0 && index < workers_num);
-
-       g_assert (!workers_data [index].thread);
-       mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
+       mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
 }
 
 void
 sgen_workers_start_all_workers (void)
 {
-       State old_state, new_state;
-       int i;
-       gboolean result;
-
        if (!collection_needs_workers ())
                return;
 
        if (sgen_get_major_collector ()->init_worker_thread)
                sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
 
-       old_state = new_state = workers_state;
-       assert_not_working (old_state);
-
-       g_assert (workers_job_queue_num_entries == 0);
-       workers_num_jobs_enqueued = 0;
-       workers_num_jobs_finished = 0;
-
-       if (workers_started) {
-               workers_signal_enqueue_work (workers_num, FALSE);
-               return;
-       }
-
-       new_state.data.state = STATE_WORKING;
-       new_state.data.num_awake = workers_num;
-       result = set_state (old_state, new_state);
-       SGEN_ASSERT (0, result, "Nobody else should have modified the state - workers have not been started yet");
-
-       for (i = 0; i < workers_num; ++i)
-               workers_start_worker (i);
-
-       workers_started = TRUE;
-}
-
-gboolean
-sgen_workers_have_started (void)
-{
-       return workers_started;
+       workers_signal_enqueue_work (FALSE);
 }
 
 void
 sgen_workers_join (void)
 {
-       State old_state;
        int i;
 
        if (!collection_needs_workers ())
                return;
 
-       for (;;) {
-               old_state = workers_state;
-               SGEN_ASSERT (0, old_state.data.state != STATE_NURSERY_COLLECTION, "Can't be in nursery collection when joining");
-
-               if (old_state.data.state == STATE_WORKING) {
-                       State new_state = old_state;
+       sgen_thread_pool_wait_for_all_jobs ();
 
-                       SGEN_ASSERT (0, !old_state.data.post_done, "Why is post_done already set?");
-                       new_state.data.post_done = 1;
-                       if (!set_state (old_state, new_state))
-                               continue;
-
-                       MONO_SEM_WAIT (&workers_done_sem);
-
-                       old_state = workers_state;
-               }
-
-               assert_not_working (old_state);
+       for (;;) {
+               SGEN_ASSERT (0, workers_state.data.state != STATE_NURSERY_COLLECTION, "Can't be in nursery collection when joining");
+               sgen_thread_pool_idle_wait ();
+               assert_not_working (workers_state);
 
                /*
                 * Checking whether there is still work left and, if not, going to sleep,
@@ -573,10 +370,10 @@ sgen_workers_join (void)
                 * workers.  Therefore there's a race condition where work can be added
                 * after they've checked for work, and before they've gone to sleep.
                 */
-               if (!workers_job_queue_num_entries && sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
+               if (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
                        break;
 
-               workers_signal_enqueue_work (workers_num, FALSE);
+               workers_signal_enqueue_work (FALSE);
        }
 
        /* At this point all the workers have stopped. */
@@ -586,7 +383,6 @@ sgen_workers_join (void)
                        sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
        }
 
-       g_assert (workers_job_queue_num_entries == 0);
        g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
        for (i = 0; i < workers_num; ++i)
                g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
@@ -601,23 +397,14 @@ sgen_workers_all_done (void)
 gboolean
 sgen_workers_are_working (void)
 {
-       State state = workers_state;
-       return state.data.num_awake > 0 || state.data.num_posted > 0;
+       return workers_state.data.state == STATE_WORKING;
 }
 
-gboolean
-sgen_is_worker_thread (MonoNativeThreadId thread)
+void
+sgen_workers_wait (void)
 {
-       int i;
-
-       if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
-               return TRUE;
-
-       for (i = 0; i < workers_num; ++i) {
-               if (workers_data [i].thread == thread)
-                       return TRUE;
-       }
-       return FALSE;
+       sgen_thread_pool_idle_wait ();
+       SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done after we wait for them?");
 }
 
 SgenSectionGrayQueue*
index 5c509de612898c2bb40264c5862b615a46266427..1d3b4fac44c0a14e8cdb44766edfbd86d664fb94 100644 (file)
 #ifndef __MONO_SGEN_WORKER_H__
 #define __MONO_SGEN_WORKER_H__
 
+#include "mono/metadata/sgen-thread-pool.h"
+
 typedef struct _WorkerData WorkerData;
 struct _WorkerData {
        int index;
-       MonoNativeThreadId thread;
        void *major_collector_data;
 
        SgenGrayQueue private_gray_queue; /* only read/written by worker thread */
 };
 
-typedef void (*JobFunc) (WorkerData *worker_data, void *job_data);
-
-typedef struct _JobQueueEntry JobQueueEntry;
-struct _JobQueueEntry {
-       const char *name;
-       JobFunc func;
-       void *data;
-
-       volatile JobQueueEntry *next;
-};
-
 void sgen_workers_init (int num_workers);
 void sgen_workers_start_all_workers (void);
-gboolean sgen_workers_have_started (void);
 void sgen_workers_ensure_awake (void);
 void sgen_workers_init_distribute_gray_queue (void);
-void sgen_workers_enqueue_job (const char *name, JobFunc func, void *data);
+void sgen_workers_enqueue_job (SgenThreadPoolJob *job);
 void sgen_workers_wait_for_jobs_finished (void);
 void sgen_workers_distribute_gray_queue_sections (void);
 void sgen_workers_reset_data (void);
 void sgen_workers_join (void);
 gboolean sgen_workers_all_done (void);
 gboolean sgen_workers_are_working (void);
+void sgen_workers_wait (void);
 SgenSectionGrayQueue* sgen_workers_get_distribute_section_gray_queue (void);
 
 void sgen_workers_signal_start_nursery_collection_and_wait (void);