[sgen] Implement work context for thread pool threads
authorVlad Brezae <brezaevlad@gmail.com>
Thu, 15 Jun 2017 22:15:51 +0000 (01:15 +0300)
committerVlad Brezae <brezaevlad@gmail.com>
Fri, 23 Jun 2017 17:47:49 +0000 (20:47 +0300)
We need to run jobs on the thread pool threads for parallel minor, parallel major, concurrent mark and concurrent sweep, some of which can run simultaneously. Instead of making separate threads reserved for each type of work, we create a work context abstraction (this contains a set of pending jobs to run and the sgen-worker callbacks). When a thread pool thread looks for work, it searches for work (jobs and idle work) through all contexts and finishes work from each one of them, from the highest to the lowest priority.

This is implemented by having different thread pool contexts (which contain the job list, work callbacks that indicate whether it should work and its ineherent priority), overall unrelated to the GC. The sgen worker infrastructure also creates separate worker contexts (which contain the object ops to be used and information about each worker, like the state and the private gray queue). We use at most two worker contexts (for minor and for major) and each worker context makes use of a sgen thread pool context.

mono/metadata/sgen-mono.c
mono/metadata/sgen-stw.c
mono/sgen/sgen-gc.c
mono/sgen/sgen-gc.h
mono/sgen/sgen-marksweep.c
mono/sgen/sgen-protocol.c
mono/sgen/sgen-simple-nursery.c
mono/sgen/sgen-thread-pool.c
mono/sgen/sgen-thread-pool.h
mono/sgen/sgen-workers.c
mono/sgen/sgen-workers.h

index 0264158494f283930fde29f68eb244fa42adaf34..99f6100e36bb462136cdd79999dc38f22651e3f8 100644 (file)
@@ -2973,9 +2973,7 @@ mono_gc_base_init (void)
 void
 mono_gc_base_cleanup (void)
 {
-       sgen_thread_pool_shutdown (major_collector.get_sweep_pool ());
-
-       sgen_workers_shutdown ();
+       sgen_thread_pool_shutdown ();
 
        // We should have consumed any outstanding moves.
        g_assert (sgen_pointer_queue_is_empty (&moved_objects_queue));
index be999592a6ae7831e56f709e9520a09126b57899..b90f8c2789e91f0952f4e34a1237f19bff14e7ec 100644 (file)
@@ -230,8 +230,7 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info, int *reason)
        We can't suspend the workers that will do all the heavy lifting.
        FIXME Use some state bit in SgenThreadInfo for this.
        */
-       if (sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), mono_thread_info_get_tid (info)) ||
-                       sgen_workers_is_worker_thread (mono_thread_info_get_tid (info))) {
+       if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) {
                if (reason)
                        *reason = 4;
                return FALSE;
index 3d04584377b8597dfc32aa46fb6f49483c1ab4b7..a590153dd637f005f9bbfd55af8220908d4cbc3f 100644 (file)
@@ -454,7 +454,7 @@ sgen_workers_get_job_gray_queue (WorkerData *worker_data, SgenGrayQueue *default
 static void
 gray_queue_redirect (SgenGrayQueue *queue)
 {
-       sgen_workers_take_from_queue (queue);
+       sgen_workers_take_from_queue (current_collection_generation, queue);
 }
 
 void
@@ -1375,7 +1375,7 @@ scan_copy_context_for_scan_job (void *worker_data_untyped, ScanJob *job)
                 * object ops changes, like forced concurrent finish.
                 */
                SGEN_ASSERT (0, sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "We need a context for the scan job");
-               job->ops = sgen_workers_get_idle_func_object_ops ();
+               job->ops = sgen_workers_get_idle_func_object_ops (worker_data);
        }
 
        return CONTEXT_FROM_OBJECT_OPERATIONS (job->ops, sgen_workers_get_job_gray_queue (worker_data, job->gc_thread_gray_queue));
@@ -1521,7 +1521,7 @@ workers_finish_callback (void)
 {
        ParallelScanJob *psj;
        ScanJob *sj;
-       int split_count = sgen_workers_get_job_split_count ();
+       int split_count = sgen_workers_get_job_split_count (GENERATION_OLD);
        int i;
        /* Mod union preclean jobs */
        for (i = 0; i < split_count; i++) {
@@ -1529,7 +1529,7 @@ workers_finish_callback (void)
                psj->scan_job.gc_thread_gray_queue = NULL;
                psj->job_index = i;
                psj->job_split_count = split_count;
-               sgen_workers_enqueue_job (&psj->scan_job.job, TRUE);
+               sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, TRUE);
        }
 
        for (i = 0; i < split_count; i++) {
@@ -1537,12 +1537,12 @@ workers_finish_callback (void)
                psj->scan_job.gc_thread_gray_queue = NULL;
                psj->job_index = i;
                psj->job_split_count = split_count;
-               sgen_workers_enqueue_job (&psj->scan_job.job, TRUE);
+               sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, TRUE);
        }
 
        sj = (ScanJob*)sgen_thread_pool_job_alloc ("scan last pinned", job_scan_last_pinned, sizeof (ScanJob));
        sj->gc_thread_gray_queue = NULL;
-       sgen_workers_enqueue_job (&sj->job, TRUE);
+       sgen_workers_enqueue_job (GENERATION_OLD, &sj->job, TRUE);
 }
 
 static void
@@ -1554,13 +1554,13 @@ init_gray_queue (SgenGrayQueue *gc_thread_gray_queue)
 static void
 enqueue_scan_remembered_set_jobs (SgenGrayQueue *gc_thread_gray_queue, SgenObjectOperations *ops, gboolean enqueue)
 {
-       int i, split_count = sgen_workers_get_job_split_count ();
+       int i, split_count = sgen_workers_get_job_split_count (GENERATION_NURSERY);
        ScanJob *sj;
 
        sj = (ScanJob*)sgen_thread_pool_job_alloc ("scan wbroots", job_scan_wbroots, sizeof (ScanJob));
        sj->ops = ops;
        sj->gc_thread_gray_queue = gc_thread_gray_queue;
-       sgen_workers_enqueue_job (&sj->job, enqueue);
+       sgen_workers_enqueue_job (GENERATION_NURSERY, &sj->job, enqueue);
 
        for (i = 0; i < split_count; i++) {
                ParallelScanJob *psj;
@@ -1570,14 +1570,14 @@ enqueue_scan_remembered_set_jobs (SgenGrayQueue *gc_thread_gray_queue, SgenObjec
                psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
                psj->job_index = i;
                psj->job_split_count = split_count;
-               sgen_workers_enqueue_job (&psj->scan_job.job, enqueue);
+               sgen_workers_enqueue_job (GENERATION_NURSERY, &psj->scan_job.job, enqueue);
 
                psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("scan LOS remsets", job_scan_los_card_table, sizeof (ParallelScanJob));
                psj->scan_job.ops = ops;
                psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
                psj->job_index = i;
                psj->job_split_count = split_count;
-               sgen_workers_enqueue_job (&psj->scan_job.job, enqueue);
+               sgen_workers_enqueue_job (GENERATION_NURSERY, &psj->scan_job.job, enqueue);
        }
 }
 
@@ -1596,7 +1596,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
        scrrj->heap_start = heap_start;
        scrrj->heap_end = heap_end;
        scrrj->root_type = ROOT_TYPE_NORMAL;
-       sgen_workers_enqueue_job (&scrrj->scan_job.job, enqueue);
+       sgen_workers_enqueue_job (current_collection_generation, &scrrj->scan_job.job, enqueue);
 
        if (current_collection_generation == GENERATION_OLD) {
                /* During minors we scan the cardtable for these roots instead */
@@ -1606,7 +1606,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
                scrrj->heap_start = heap_start;
                scrrj->heap_end = heap_end;
                scrrj->root_type = ROOT_TYPE_WBARRIER;
-               sgen_workers_enqueue_job (&scrrj->scan_job.job, enqueue);
+               sgen_workers_enqueue_job (current_collection_generation, &scrrj->scan_job.job, enqueue);
        }
 
        /* Threads */
@@ -1616,7 +1616,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
        stdj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
        stdj->heap_start = heap_start;
        stdj->heap_end = heap_end;
-       sgen_workers_enqueue_job (&stdj->scan_job.job, enqueue);
+       sgen_workers_enqueue_job (current_collection_generation, &stdj->scan_job.job, enqueue);
 
        /* Scan the list of objects ready for finalization. */
 
@@ -1624,13 +1624,13 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
        sfej->scan_job.ops = ops;
        sfej->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
        sfej->queue = &fin_ready_queue;
-       sgen_workers_enqueue_job (&sfej->scan_job.job, enqueue);
+       sgen_workers_enqueue_job (current_collection_generation, &sfej->scan_job.job, enqueue);
 
        sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan critical finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob));
        sfej->scan_job.ops = ops;
        sfej->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
        sfej->queue = &critical_fin_queue;
-       sgen_workers_enqueue_job (&sfej->scan_job.job, enqueue);
+       sgen_workers_enqueue_job (current_collection_generation, &sfej->scan_job.job, enqueue);
 }
 
 /*
@@ -1756,8 +1756,8 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
 
        if (is_parallel) {
                gray_queue_redirect (&gc_thread_gray_queue);
-               sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
-               sgen_workers_join ();
+               sgen_workers_start_all_workers (GENERATION_NURSERY, object_ops_nopar, object_ops_par, NULL);
+               sgen_workers_join (GENERATION_NURSERY);
        }
 
        TV_GETTIME (btv);
@@ -1990,15 +1990,15 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
        SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done when we start or finish a major collection?");
        if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
                if (object_ops_par != NULL)
-                       sgen_workers_set_num_active_workers (0);
-               if (sgen_workers_have_idle_work ()) {
+                       sgen_workers_set_num_active_workers (GENERATION_OLD, 0);
+               if (sgen_workers_have_idle_work (GENERATION_OLD)) {
                        /*
                         * We force the finish of the worker with the new object ops context
                         * which can also do copying. We need to have finished pinning.
                         */
-                       sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
+                       sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
 
-                       sgen_workers_join ();
+                       sgen_workers_join (GENERATION_OLD);
                }
        }
 
@@ -2024,17 +2024,17 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
         * the roots.
         */
        if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT) {
-               sgen_workers_set_num_active_workers (1);
+               sgen_workers_set_num_active_workers (GENERATION_OLD, 1);
                gray_queue_redirect (gc_thread_gray_queue);
                if (precleaning_enabled) {
-                       sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, workers_finish_callback);
+                       sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, workers_finish_callback);
                } else {
-                       sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
+                       sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
                }
        }
 
        if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
-               int i, split_count = sgen_workers_get_job_split_count ();
+               int i, split_count = sgen_workers_get_job_split_count (GENERATION_OLD);
                gboolean parallel = object_ops_par != NULL;
 
                /* If we're not parallel we finish the collection on the gc thread */
@@ -2050,14 +2050,14 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
                        psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
                        psj->job_index = i;
                        psj->job_split_count = split_count;
-                       sgen_workers_enqueue_job (&psj->scan_job.job, parallel);
+                       sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, parallel);
 
                        psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("scan LOS mod union cardtable", job_scan_los_mod_union_card_table, sizeof (ParallelScanJob));
                        psj->scan_job.ops = parallel ? NULL : object_ops_nopar;
                        psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
                        psj->job_index = i;
                        psj->job_split_count = split_count;
-                       sgen_workers_enqueue_job (&psj->scan_job.job, parallel);
+                       sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, parallel);
                }
 
                if (parallel) {
@@ -2069,8 +2069,8 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
                         * stack that contained roots and pinned objects and also scan the mod union card
                         * table.
                         */
-                       sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
-                       sgen_workers_join ();
+                       sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
+                       sgen_workers_join (GENERATION_OLD);
                }
        }
 
@@ -2095,7 +2095,7 @@ major_start_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason,
 
        current_collection_generation = GENERATION_OLD;
 
-       sgen_workers_assert_gray_queue_is_empty ();
+       sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
 
        if (!concurrent)
                sgen_cement_reset ();
@@ -2158,7 +2158,7 @@ major_finish_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason
                object_ops_nopar = &major_collector.major_ops_serial;
        }
 
-       sgen_workers_assert_gray_queue_is_empty ();
+       sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
 
        finish_gray_stack (GENERATION_OLD, CONTEXT_FROM_OBJECT_OPERATIONS (object_ops_nopar, gc_thread_gray_queue));
        TV_GETTIME (atv);
@@ -2250,7 +2250,7 @@ major_finish_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason
        memset (&counts, 0, sizeof (ScannedObjectCounts));
        major_collector.finish_major_collection (&counts);
 
-       sgen_workers_assert_gray_queue_is_empty ();
+       sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
 
        SGEN_ASSERT (0, sgen_workers_all_done (), "Can't have workers working after major collection has finished");
        if (concurrent_collection_in_progress)
@@ -2373,7 +2373,7 @@ major_finish_concurrent_collection (gboolean forced)
         * The workers will be resumed with a finishing pause context to avoid
         * additional cardtable and object scanning.
         */
-       sgen_workers_stop_all_workers ();
+       sgen_workers_stop_all_workers (GENERATION_OLD);
 
        SGEN_TV_GETTIME (time_major_conc_collection_end);
        gc_stats.major_gc_time_concurrent += SGEN_TV_ELAPSED (time_major_conc_collection_start, time_major_conc_collection_end);
@@ -3607,19 +3607,7 @@ sgen_gc_init (void)
        if (major_collector.post_param_init)
                major_collector.post_param_init (&major_collector);
 
-       if (major_collector.is_concurrent || sgen_minor_collector.is_parallel) {
-               int num_workers = 1;
-               if (major_collector.is_parallel || sgen_minor_collector.is_parallel) {
-                       num_workers = mono_cpu_count ();
-                       if (num_workers <= 1) {
-                               num_workers = 1;
-                               major_collector.is_parallel = FALSE;
-                               sgen_minor_collector.is_parallel = FALSE;
-                       }
-               }
-               if (major_collector.is_concurrent || sgen_minor_collector.is_parallel)
-                       sgen_workers_init (num_workers, (SgenWorkerCallback) major_collector.worker_init_cb);
-       }
+       sgen_thread_pool_start ();
 
        sgen_memgov_init (max_heap, soft_limit, debug_print_allowance, allowance_ratio, save_target);
 
index 6967f29ea7ced28bd86895840e460315e9a794f9..0cc573aab763c59322a99063f7a8a329175faf44 100644 (file)
@@ -678,9 +678,7 @@ struct _SgenMajorCollector {
        guint8* (*get_cardtable_mod_union_for_reference) (char *object);
        long long (*get_and_reset_num_major_objects_marked) (void);
        void (*count_cards) (long long *num_total_cards, long long *num_marked_cards);
-       SgenThreadPool* (*get_sweep_pool) (void);
-
-       void (*worker_init_cb) (gpointer worker);
+       void (*init_block_free_lists) (gpointer *list_p);
 };
 
 extern SgenMajorCollector major_collector;
index 7711679e3cb8239c3b06fdccf20f5c57cb84a82b..2d1bc5b279917b626d7615ea89d0c5ca305d7f7c 100644 (file)
@@ -32,6 +32,7 @@
 #include "mono/sgen/sgen-thread-pool.h"
 #include "mono/sgen/sgen-client.h"
 #include "mono/utils/mono-memory-model.h"
+#include "mono/utils/mono-proclib.h"
 
 static int ms_block_size;
 
@@ -193,8 +194,7 @@ static volatile int sweep_state = SWEEP_STATE_SWEPT;
 static gboolean concurrent_mark;
 static gboolean concurrent_sweep = TRUE;
 
-SgenThreadPool sweep_pool_inst;
-SgenThreadPool *sweep_pool;
+int sweep_pool_context = -1;
 
 #define BLOCK_IS_TAGGED_HAS_REFERENCES(bl)     SGEN_POINTER_IS_TAGGED_1 ((bl))
 #define BLOCK_TAG_HAS_REFERENCES(bl)           SGEN_POINTER_TAG_1 ((bl))
@@ -927,7 +927,7 @@ major_finish_sweep_checking (void)
  wait:
        job = sweep_job;
        if (job)
-               sgen_thread_pool_job_wait (sweep_pool, job);
+               sgen_thread_pool_job_wait (sweep_pool_context, job);
        SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?");
        SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?");
 }
@@ -1599,7 +1599,8 @@ sweep_start (void)
                        free_blocks [j] = NULL;
        }
 
-       sgen_workers_foreach (sgen_worker_clear_free_block_lists);
+       sgen_workers_foreach (GENERATION_NURSERY, sgen_worker_clear_free_block_lists);
+       sgen_workers_foreach (GENERATION_OLD, sgen_worker_clear_free_block_lists);
 }
 
 static void sweep_finish (void);
@@ -1815,7 +1816,7 @@ sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job)
         */
        if (concurrent_sweep && lazy_sweep) {
                sweep_blocks_job = sgen_thread_pool_job_alloc ("sweep_blocks", sweep_blocks_job_func, sizeof (SgenThreadPoolJob));
-               sgen_thread_pool_job_enqueue (sweep_pool, sweep_blocks_job);
+               sgen_thread_pool_job_enqueue (sweep_pool_context, sweep_blocks_job);
        }
 
        sweep_finish ();
@@ -1864,7 +1865,7 @@ major_sweep (void)
        SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?");
        if (concurrent_sweep) {
                sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob));
-               sgen_thread_pool_job_enqueue (sweep_pool, sweep_job);
+               sgen_thread_pool_job_enqueue (sweep_pool_context, sweep_job);
        } else {
                sweep_job_func (NULL, NULL);
        }
@@ -2067,7 +2068,8 @@ major_start_major_collection (void)
        }
 
        /* We expect workers to have very few blocks on the freelist, just evacuate them */
-       sgen_workers_foreach (sgen_worker_clear_free_block_lists_evac);
+       sgen_workers_foreach (GENERATION_NURSERY, sgen_worker_clear_free_block_lists_evac);
+       sgen_workers_foreach (GENERATION_OLD, sgen_worker_clear_free_block_lists_evac);
 
        if (lazy_sweep && concurrent_sweep) {
                /*
@@ -2077,7 +2079,7 @@ major_start_major_collection (void)
                 */
                SgenThreadPoolJob *job = sweep_blocks_job;
                if (job)
-                       sgen_thread_pool_job_wait (sweep_pool, job);
+                       sgen_thread_pool_job_wait (sweep_pool_context, job);
        }
 
        if (lazy_sweep && !concurrent_sweep)
@@ -2117,12 +2119,6 @@ major_finish_major_collection (ScannedObjectCounts *counts)
 #endif
 }
 
-static SgenThreadPool*
-major_get_sweep_pool (void)
-{
-       return sweep_pool;
-}
-
 static int
 compare_pointers (const void *va, const void *vb) {
        char *a = *(char**)va, *b = *(char**)vb;
@@ -2736,28 +2732,38 @@ post_param_init (SgenMajorCollector *collector)
        collector->sweeps_lazily = lazy_sweep;
 }
 
-/* We are guaranteed to be called by the worker in question */
+/*
+ * We are guaranteed to be called by the worker in question.
+ * This provides initialization for threads that plan to do
+ * parallel object allocation. We need to store these lists
+ * in additional data structures so we can traverse them
+ * at major/sweep start.
+ */
 static void
-sgen_worker_init_callback (gpointer worker_untyped)
+sgen_init_block_free_lists (gpointer *list_p)
 {
        int i;
-       WorkerData *worker = (WorkerData*) worker_untyped;
-       MSBlockInfo ***worker_free_blocks = (MSBlockInfo ***) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo**) * MS_BLOCK_TYPE_MAX, INTERNAL_MEM_MS_TABLES, TRUE);
+       MSBlockInfo ***worker_free_blocks = (MSBlockInfo ***) mono_native_tls_get_value (worker_block_free_list_key);
+
+       /*
+        * For simplification, a worker thread uses the same free block lists,
+        * regardless of the context it is part of (major/minor).
+        */
+       if (worker_free_blocks) {
+               *list_p = (gpointer)worker_free_blocks;
+               return;
+       }
+
+       worker_free_blocks = (MSBlockInfo ***) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo**) * MS_BLOCK_TYPE_MAX, INTERNAL_MEM_MS_TABLES, TRUE);
 
        for (i = 0; i < MS_BLOCK_TYPE_MAX; i++)
                worker_free_blocks [i] = (MSBlockInfo **) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo*) * num_block_obj_sizes, INTERNAL_MEM_MS_TABLES, TRUE);
 
-       worker->free_block_lists = worker_free_blocks;
+       *list_p = (gpointer)worker_free_blocks;
 
        mono_native_tls_set_value (worker_block_free_list_key, worker_free_blocks);
 }
 
-static void
-thread_pool_init_func (void *data_untyped)
-{
-       sgen_client_thread_register_worker ();
-}
-
 static void
 sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurrent, gboolean is_parallel)
 {
@@ -2770,6 +2776,9 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
 
        sgen_register_fixed_internal_mem_type (INTERNAL_MEM_MS_BLOCK_INFO, SIZEOF_MS_BLOCK_INFO);
 
+       if (mono_cpu_count () <= 1)
+               is_parallel = FALSE;
+
        num_block_obj_sizes = ms_calculate_block_obj_sizes (MS_BLOCK_OBJ_SIZE_FACTOR, NULL);
        block_obj_sizes = (int *)sgen_alloc_internal_dynamic (sizeof (int) * num_block_obj_sizes, INTERNAL_MEM_MS_TABLES, TRUE);
        ms_calculate_block_obj_sizes (MS_BLOCK_OBJ_SIZE_FACTOR, block_obj_sizes);
@@ -2800,10 +2809,8 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
                g_assert (MS_BLOCK_OBJ_SIZE_INDEX (i) == ms_find_block_obj_size_index (i));
 
        /* We can do this because we always init the minor before the major */
-       if (is_parallel || sgen_get_minor_collector ()->is_parallel) {
+       if (is_parallel || sgen_get_minor_collector ()->is_parallel)
                mono_native_tls_alloc (&worker_block_free_list_key, NULL);
-               collector->worker_init_cb = sgen_worker_init_callback;
-       }
 
        mono_counters_register ("# major blocks allocated", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_major_blocks_alloced);
        mono_counters_register ("# major blocks freed", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_major_blocks_freed);
@@ -2863,7 +2870,7 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        collector->is_valid_object = major_is_valid_object;
        collector->describe_pointer = major_describe_pointer;
        collector->count_cards = major_count_cards;
-       collector->get_sweep_pool = major_get_sweep_pool;
+       collector->init_block_free_lists = sgen_init_block_free_lists;
 
        collector->major_ops_serial.copy_or_mark_object = major_copy_or_mark_object_canonical;
        collector->major_ops_serial.scan_object = major_scan_object_with_evacuation;
@@ -2924,11 +2931,13 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        /*cardtable requires major pages to be 8 cards aligned*/
        g_assert ((ms_block_size % (8 * CARD_SIZE_IN_BYTES)) == 0);
 
-       if (concurrent_sweep) {
-               SgenThreadPool **thread_datas = &sweep_pool;
-               sweep_pool = &sweep_pool_inst;
-               sgen_thread_pool_init (sweep_pool, 1, thread_pool_init_func, NULL, NULL, NULL, (SgenThreadPoolData**)&thread_datas);
-       }
+       if (is_concurrent && is_parallel)
+               sgen_workers_create_context (GENERATION_OLD, mono_cpu_count ());
+       else if (is_concurrent)
+               sgen_workers_create_context (GENERATION_OLD, 1);
+
+       if (concurrent_sweep)
+               sweep_pool_context = sgen_thread_pool_create_context (1, NULL, NULL, NULL, NULL, NULL);
 }
 
 void
index f86b05e51c963dd4de1f6c5dba3728732db8b97a..b53965b4aebe1e503d32a8e3444a4f28a79f495b 100644 (file)
@@ -371,9 +371,7 @@ protocol_entry (unsigned char type, gpointer data, int size)
                 * If the thread is not a worker thread we insert 0, which is interpreted
                 * as gc thread. Worker indexes are 1 based.
                 */
-               worker_index = sgen_workers_is_worker_thread (tid);
-               if (!worker_index)
-                       worker_index = sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), tid);
+               worker_index = sgen_thread_pool_is_thread_pool_thread (tid);
                /* FIXME Consider using different index bases for different thread pools */
                buffer->buffer [index++] = (unsigned char) worker_index;
        }
index 39d08abcf16027b60b884376a02acbc9a2517ea3..5d39368402b133d4e6c0a0bd2adcc22ad611c2af 100644 (file)
@@ -19,7 +19,9 @@
 #include "mono/sgen/sgen-protocol.h"
 #include "mono/sgen/sgen-layout-stats.h"
 #include "mono/sgen/sgen-client.h"
+#include "mono/sgen/sgen-workers.h"
 #include "mono/utils/mono-memory-model.h"
+#include "mono/utils/mono-proclib.h"
 
 static inline GCObject*
 alloc_for_promotion (GCVTable vtable, GCObject *obj, size_t objsize, gboolean has_references)
@@ -130,6 +132,9 @@ fill_serial_with_concurrent_major_ops (SgenObjectOperations *ops)
 void
 sgen_simple_nursery_init (SgenMinorCollector *collector, gboolean parallel)
 {
+       if (mono_cpu_count () <= 1)
+               parallel = FALSE;
+
        collector->is_split = FALSE;
        collector->is_parallel = parallel;
 
@@ -146,6 +151,13 @@ sgen_simple_nursery_init (SgenMinorCollector *collector, gboolean parallel)
        fill_serial_ops (&collector->serial_ops);
        fill_serial_with_concurrent_major_ops (&collector->serial_ops_with_concurrent_major);
        fill_parallel_ops (&collector->parallel_ops);
+
+       /*
+        * The nursery worker context is created first so it will have priority over
+        * concurrent mark and concurrent sweep.
+        */
+       if (parallel)
+               sgen_workers_create_context (GENERATION_NURSERY, mono_cpu_count ());
 }
 
 
index a7abfad24472822f85e7f51632aa998cb913f05c..6a03bc3d69d164b5cd949b0b71760fc84f41d04b 100644 (file)
 
 #include "mono/sgen/sgen-gc.h"
 #include "mono/sgen/sgen-thread-pool.h"
+#include "mono/sgen/sgen-client.h"
 #include "mono/utils/mono-os-mutex.h"
 
+static mono_mutex_t lock;
+static mono_cond_t work_cond;
+static mono_cond_t done_cond;
+
+static int threads_num;
+static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
+
+static volatile gboolean threadpool_shutdown;
+static volatile int threads_finished;
+
+static int contexts_num;
+static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
+
 enum {
        STATE_WAITING,
        STATE_IN_PROGRESS,
@@ -22,10 +36,10 @@ enum {
 
 /* Assumes that the lock is held. */
 static SgenThreadPoolJob*
-get_job_and_set_in_progress (SgenThreadPool *pool)
+get_job_and_set_in_progress (SgenThreadPoolContext *context)
 {
-       for (size_t i = 0; i < pool->job_queue.next_slot; ++i) {
-               SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i];
+       for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
+               SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
                if (job->state == STATE_WAITING) {
                        job->state = STATE_IN_PROGRESS;
                        return job;
@@ -36,10 +50,10 @@ get_job_and_set_in_progress (SgenThreadPool *pool)
 
 /* Assumes that the lock is held. */
 static ssize_t
-find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
+find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
 {
-       for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) {
-               if (pool->job_queue.data [i] == job)
+       for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
+               if (context->job_queue.data [i] == job)
                        return i;
        }
        return -1;
@@ -47,100 +61,174 @@ find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
 
 /* Assumes that the lock is held. */
 static void
-remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job)
+remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
 {
        ssize_t index;
        SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
-       index = find_job_in_queue (pool, job);
+       index = find_job_in_queue (context, job);
        SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
-       pool->job_queue.data [index] = NULL;
-       sgen_pointer_queue_remove_nulls (&pool->job_queue);
+       context->job_queue.data [index] = NULL;
+       sgen_pointer_queue_remove_nulls (&context->job_queue);
        sgen_thread_pool_job_free (job);
 }
 
 static gboolean
-continue_idle_job (SgenThreadPool *pool, void *thread_data)
+continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
 {
-       if (!pool->continue_idle_job_func)
+       if (!context->continue_idle_job_func)
                return FALSE;
-       return pool->continue_idle_job_func (thread_data);
+       return context->continue_idle_job_func (thread_data, context - pool_contexts);
 }
 
 static gboolean
-should_work (SgenThreadPool *pool, void *thread_data)
+should_work (SgenThreadPoolContext *context, void *thread_data)
 {
-       if (!pool->should_work_func)
+       if (!context->should_work_func)
                return TRUE;
-       return pool->should_work_func (thread_data);
+       return context->should_work_func (thread_data);
 }
 
-static mono_native_thread_return_t
-thread_func (SgenThreadPoolData *thread_data)
+/*
+ * Tells whether we should lock and attempt to get work from
+ * a higher priority context.
+ */
+static gboolean
+has_priority_work (int worker_index, int current_context)
 {
-       SgenThreadPool *pool = thread_data->pool;
-
-       pool->thread_init_func (thread_data);
+       int i;
 
-       mono_os_mutex_lock (&pool->lock);
-       for (;;) {
-               gboolean do_idle;
-               SgenThreadPoolJob *job;
+       for (i = 0; i < current_context; i++) {
+               SgenThreadPoolContext *context = &pool_contexts [i];
+               void *thread_data;
 
-               if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) {
-                       mono_os_cond_wait (&pool->work_cond, &pool->lock);
+               if (worker_index >= context->num_threads)
                        continue;
+               thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
+               if (!should_work (context, thread_data))
+                       continue;
+               if (context->job_queue.next_slot > 0)
+                       return TRUE;
+               if (continue_idle_job (context, thread_data))
+                       return TRUE;
+       }
+
+       /* Return if job enqueued on current context. Jobs have priority over idle work */
+       if (pool_contexts [current_context].job_queue.next_slot > 0)
+               return TRUE;
+
+       return FALSE;
+}
+
+/*
+ * Gets the highest priority work. If there is none, it waits
+ * for work_cond. Should always be called with lock held.
+ */
+static void
+get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
+{
+       while (!threadpool_shutdown) {
+               int i;
+
+               for (i = 0; i < contexts_num; i++) {
+                       SgenThreadPoolContext *context = &pool_contexts [i];
+                       void *thread_data;
+
+                       if (worker_index >= context->num_threads)
+                               continue;
+                       thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
+
+                       if (!should_work (context, thread_data))
+                               continue;
+
+                       /*
+                        * It's important that we check the continue idle flag with the lock held.
+                        * Suppose we didn't check with the lock held, and the result is FALSE.  The
+                        * main thread might then set continue idle and signal us before we can take
+                        * the lock, and we'd lose the signal.
+                        */
+                       *do_idle = continue_idle_job (context, thread_data);
+                       *job = get_job_and_set_in_progress (context);
+
+                       if (*job || *do_idle) {
+                               *work_context = i;
+                               return;
+                       }
                }
+
                /*
-                * It's important that we check the continue idle flag with the lock held.
-                * Suppose we didn't check with the lock held, and the result is FALSE.  The
-                * main thread might then set continue idle and signal us before we can take
-                * the lock, and we'd lose the signal.
+                * Nothing to do on any context
+                * pthread_cond_wait() can return successfully despite the condition
+                * not being signalled, so we have to run this in a loop until we
+                * really have work to do.
                 */
-               do_idle = continue_idle_job (pool, thread_data);
-               job = get_job_and_set_in_progress (pool);
+               mono_os_cond_wait (&work_cond, &lock);
+       }
+}
 
-               if (!job && !do_idle && !pool->threadpool_shutdown) {
-                       /*
-                        * pthread_cond_wait() can return successfully despite the condition
-                        * not being signalled, so we have to run this in a loop until we
-                        * really have work to do.
-                        */
-                       mono_os_cond_wait (&pool->work_cond, &pool->lock);
-                       continue;
+static mono_native_thread_return_t
+thread_func (int worker_index)
+{
+       int current_context;
+       void *thread_data = NULL;
+
+       sgen_client_thread_register_worker ();
+
+       for (current_context = 0; current_context < contexts_num; current_context++) {
+               if (worker_index >= pool_contexts [current_context].num_threads ||
+                               !pool_contexts [current_context].thread_init_func)
+                       break;
+
+               thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
+               pool_contexts [current_context].thread_init_func (thread_data);
+       }
+
+       current_context = 0;
+
+       mono_os_mutex_lock (&lock);
+       for (;;) {
+               gboolean do_idle = FALSE;
+               SgenThreadPoolJob *job = NULL;
+               SgenThreadPoolContext *context = NULL;
+
+               get_work (worker_index, &current_context, &do_idle, &job);
+
+               if (!threadpool_shutdown) {
+                       context = &pool_contexts [current_context];
+                       thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
                }
 
-               mono_os_mutex_unlock (&pool->lock);
+               mono_os_mutex_unlock (&lock);
 
                if (job) {
                        job->func (thread_data, job);
 
-                       mono_os_mutex_lock (&pool->lock);
+                       mono_os_mutex_lock (&lock);
 
                        SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
                        job->state = STATE_DONE;
-                       remove_job (pool, job);
+                       remove_job (context, job);
                        /*
                         * Only the main GC thread will ever wait on the done condition, so we don't
                         * have to broadcast.
                         */
-                       mono_os_cond_signal (&pool->done_cond);
+                       mono_os_cond_signal (&done_cond);
                } else if (do_idle) {
-                       SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?");
+                       SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
                        do {
-                               pool->idle_job_func (thread_data);
-                               do_idle = continue_idle_job (pool, thread_data);
-                       } while (do_idle && !pool->job_queue.next_slot);
+                               context->idle_job_func (thread_data);
+                               do_idle = continue_idle_job (context, thread_data);
+                       } while (do_idle && !has_priority_work (worker_index, current_context));
 
-                       mono_os_mutex_lock (&pool->lock);
+                       mono_os_mutex_lock (&lock);
 
                        if (!do_idle)
-                               mono_os_cond_signal (&pool->done_cond);
+                               mono_os_cond_signal (&done_cond);
                } else {
-                       SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
-                       mono_os_mutex_lock (&pool->lock);
-                       pool->threads_finished++;
-                       mono_os_cond_signal (&pool->done_cond);
-                       mono_os_mutex_unlock (&pool->lock);
+                       SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
+                       mono_os_mutex_lock (&lock);
+                       threads_finished++;
+                       mono_os_cond_signal (&done_cond);
+                       mono_os_mutex_unlock (&lock);
                        return 0;
                }
        }
@@ -148,50 +236,71 @@ thread_func (SgenThreadPoolData *thread_data)
        return (mono_native_thread_return_t)0;
 }
 
+int
+sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
+{
+       int context_id = contexts_num;
+
+       SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
+
+       pool_contexts [context_id].thread_init_func = init_func;
+       pool_contexts [context_id].idle_job_func = idle_func;
+       pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
+       pool_contexts [context_id].should_work_func = should_work_func;
+       pool_contexts [context_id].thread_datas = thread_datas;
+
+       SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
+
+       pool_contexts [context_id].num_threads = num_threads;
+
+       sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
+
+       contexts_num++;
+
+       return context_id;
+}
+
 void
-sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas)
+sgen_thread_pool_start (void)
 {
        int i;
 
-       SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?");
-
-       pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
+       for (i = 0; i < contexts_num; i++) {
+               if (threads_num < pool_contexts [i].num_threads)
+                       threads_num = pool_contexts [i].num_threads;
+       }
 
-       mono_os_mutex_init (&pool->lock);
-       mono_os_cond_init (&pool->work_cond);
-       mono_os_cond_init (&pool->done_cond);
+       if (!threads_num)
+               return;
 
-       pool->thread_init_func = init_func;
-       pool->idle_job_func = idle_func;
-       pool->continue_idle_job_func = continue_idle_func;
-       pool->should_work_func = should_work_func_p;
+       mono_os_mutex_init (&lock);
+       mono_os_cond_init (&work_cond);
+       mono_os_cond_init (&done_cond);
 
-       sgen_pointer_queue_init (&pool->job_queue, 0);
-       pool->threads_finished = 0;
-       pool->threadpool_shutdown = FALSE;
+       threads_finished = 0;
+       threadpool_shutdown = FALSE;
 
-       for (i = 0; i < pool->threads_num; i++) {
-               thread_datas [i]->pool = pool;
-               mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]);
+       for (i = 0; i < threads_num; i++) {
+               mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
        }
 }
 
 void
-sgen_thread_pool_shutdown (SgenThreadPool *pool)
+sgen_thread_pool_shutdown (void)
 {
-       if (!pool)
+       if (!threads_num)
                return;
 
-       mono_os_mutex_lock (&pool->lock);
-       pool->threadpool_shutdown = TRUE;
-       mono_os_cond_broadcast (&pool->work_cond);
-       while (pool->threads_finished < pool->threads_num)
-               mono_os_cond_wait (&pool->done_cond, &pool->lock);
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_lock (&lock);
+       threadpool_shutdown = TRUE;
+       mono_os_cond_broadcast (&work_cond);
+       while (threads_finished < threads_num)
+               mono_os_cond_wait (&done_cond, &lock);
+       mono_os_mutex_unlock (&lock);
 
-       mono_os_mutex_destroy (&pool->lock);
-       mono_os_cond_destroy (&pool->work_cond);
-       mono_os_cond_destroy (&pool->done_cond);
+       mono_os_mutex_destroy (&lock);
+       mono_os_cond_destroy (&work_cond);
+       mono_os_cond_destroy (&done_cond);
 }
 
 SgenThreadPoolJob*
@@ -212,77 +321,74 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job)
 }
 
 void
-sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job)
+sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
 {
-       mono_os_mutex_lock (&pool->lock);
+       mono_os_mutex_lock (&lock);
 
-       sgen_pointer_queue_add (&pool->job_queue, job);
-       mono_os_cond_signal (&pool->work_cond);
+       sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
+       mono_os_cond_broadcast (&work_cond);
 
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_unlock (&lock);
 }
 
 void
-sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job)
+sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
 {
        SGEN_ASSERT (0, job, "Where's the job?");
 
-       mono_os_mutex_lock (&pool->lock);
+       mono_os_mutex_lock (&lock);
 
-       while (find_job_in_queue (pool, job) >= 0)
-               mono_os_cond_wait (&pool->done_cond, &pool->lock);
+       while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
+               mono_os_cond_wait (&done_cond, &lock);
 
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_unlock (&lock);
 }
 
 void
-sgen_thread_pool_idle_signal (SgenThreadPool *pool)
+sgen_thread_pool_idle_signal (int context_id)
 {
-       SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?");
+       SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
 
-       mono_os_mutex_lock (&pool->lock);
+       mono_os_mutex_lock (&lock);
 
-       if (pool->continue_idle_job_func (NULL))
-               mono_os_cond_broadcast (&pool->work_cond);
+       if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
+               mono_os_cond_broadcast (&work_cond);
 
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_unlock (&lock);
 }
 
 void
-sgen_thread_pool_idle_wait (SgenThreadPool *pool)
+sgen_thread_pool_idle_wait (int context_id)
 {
-       SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?");
+       SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
 
-       mono_os_mutex_lock (&pool->lock);
+       mono_os_mutex_lock (&lock);
 
-       while (pool->continue_idle_job_func (NULL))
-               mono_os_cond_wait (&pool->done_cond, &pool->lock);
+       while (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
+               mono_os_cond_wait (&done_cond, &lock);
 
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_unlock (&lock);
 }
 
 void
-sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool)
+sgen_thread_pool_wait_for_all_jobs (int context_id)
 {
-       mono_os_mutex_lock (&pool->lock);
+       mono_os_mutex_lock (&lock);
 
-       while (!sgen_pointer_queue_is_empty (&pool->job_queue))
-               mono_os_cond_wait (&pool->done_cond, &pool->lock);
+       while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
+               mono_os_cond_wait (&done_cond, &lock);
 
-       mono_os_mutex_unlock (&pool->lock);
+       mono_os_mutex_unlock (&lock);
 }
 
 /* Return 0 if is not a thread pool thread or the thread number otherwise */
 int
-sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread)
+sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
 {
        int i;
 
-       if (!pool)
-               return 0;
-
-       for (i = 0; i < pool->threads_num; i++) {
-               if (some_thread == pool->threads [i])
+       for (i = 0; i < threads_num; i++) {
+               if (some_thread == threads [i])
                        return i + 1;
        }
 
index 1b48e4433f040137bf37e0368f6b126b17a63c4e..b13848a5a80d20ef938cde11c3ce8371b4a6cec6 100644 (file)
 #include "mono/sgen/sgen-pointer-queue.h"
 #include "mono/utils/mono-threads.h"
 
+#define SGEN_THREADPOOL_MAX_NUM_THREADS 8
+#define SGEN_THREADPOOL_MAX_NUM_CONTEXTS 3
+
 typedef struct _SgenThreadPoolJob SgenThreadPoolJob;
-typedef struct _SgenThreadPool SgenThreadPool;
-typedef struct _SgenThreadPoolData SgenThreadPoolData;
+typedef struct _SgenThreadPoolContext SgenThreadPoolContext;
 
 typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job);
 typedef void (*SgenThreadPoolThreadInitFunc) (void*);
 typedef void (*SgenThreadPoolIdleJobFunc) (void*);
-typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
+typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*, int);
 typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*);
 
 struct _SgenThreadPoolJob {
@@ -30,16 +32,7 @@ struct _SgenThreadPoolJob {
        volatile gint32 state;
 };
 
-#define MAX_NUM_THREADS 8
-
-struct _SgenThreadPool {
-       mono_mutex_t lock;
-       mono_cond_t work_cond;
-       mono_cond_t done_cond;
-
-       int threads_num;
-       MonoNativeThreadId threads [MAX_NUM_THREADS];
-
+struct _SgenThreadPoolContext {
        /* Only accessed with the lock held. */
        SgenPointerQueue job_queue;
 
@@ -48,31 +41,29 @@ struct _SgenThreadPool {
        SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
        SgenThreadPoolShouldWorkFunc should_work_func;
 
-       volatile gboolean threadpool_shutdown;
-       volatile int threads_finished;
+       void **thread_datas;
+       int num_threads;
 };
 
-struct _SgenThreadPoolData {
-       SgenThreadPool *pool;
-};
 
-void sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, SgenThreadPoolData **thread_datas);
+int sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas);
+void sgen_thread_pool_start (void);
 
-void sgen_thread_pool_shutdown (SgenThreadPool *pool);
+void sgen_thread_pool_shutdown (void);
 
 SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size);
 /* This only needs to be called on jobs that are not enqueued. */
 void sgen_thread_pool_job_free (SgenThreadPoolJob *job);
 
-void sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job);
+void sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job);
 /* This must only be called after the job has been enqueued. */
-void sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job);
+void sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job);
 
-void sgen_thread_pool_idle_signal (SgenThreadPool *pool);
-void sgen_thread_pool_idle_wait (SgenThreadPool *pool);
+void sgen_thread_pool_idle_signal (int context_id);
+void sgen_thread_pool_idle_wait (int context_id);
 
-void sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool);
+void sgen_thread_pool_wait_for_all_jobs (int context_id);
 
-int sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId thread);
+int sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread);
 
 #endif
index c8f16d329d0c461958c14aae27af870a713cc86c..8a615ede6388b42fd1e4b90f50347264e64bba94 100644 (file)
 #include "mono/utils/mono-membar.h"
 #include "mono/sgen/sgen-client.h"
 
-static int workers_num;
-static int active_workers_num;
-static volatile gboolean started;
-static volatile gboolean forced_stop;
-static WorkerData *workers_data;
-static SgenWorkerCallback worker_init_cb;
-
-static SgenThreadPool pool_inst;
-static SgenThreadPool *pool; /* null if we're not using workers */
-
-/*
- * When using multiple workers, we need to have the last worker
- * enqueue the preclean jobs (if there are any). This lock ensures
- * that when the last worker takes it, all the other workers have
- * gracefully finished, so it can restart them.
- */
-static mono_mutex_t finished_lock;
-static volatile gboolean workers_finished;
-static int worker_awakenings;
-
-static SgenSectionGrayQueue workers_distribute_gray_queue;
+static WorkerContext worker_contexts [GENERATION_MAX];
 
 /*
  * Allowed transitions:
@@ -66,16 +46,6 @@ enum {
 
 #define SGEN_WORKER_MIN_SECTIONS_SIGNAL 4
 
-typedef gint32 State;
-
-static SgenObjectOperations * volatile idle_func_object_ops;
-static SgenObjectOperations *idle_func_object_ops_par, *idle_func_object_ops_nopar;
-/*
- * finished_callback is called only when the workers finish work normally (when they
- * are not forced to finish). The callback is used to enqueue preclean jobs.
- */
-static volatile SgenWorkersFinishCallback finish_callback;
-
 static guint64 stat_workers_num_finished;
 
 static gboolean
@@ -87,7 +57,7 @@ set_state (WorkerData *data, State old_state, State new_state)
        else if (new_state == STATE_WORKING)
                SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
        if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING)
-               SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (pool, mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
+               SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
 
        return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
 }
@@ -99,7 +69,7 @@ state_is_working_or_enqueued (State state)
 }
 
 static void
-sgen_workers_ensure_awake (void)
+sgen_workers_ensure_awake (WorkerContext *context)
 {
        int i;
        gboolean need_signal = FALSE;
@@ -110,20 +80,20 @@ sgen_workers_ensure_awake (void)
         * or when the last worker is enqueuing preclean work. In both cases we can't
         * have a worker working using a nopar context, which means it is safe.
         */
-       idle_func_object_ops = (active_workers_num > 1) ? idle_func_object_ops_par : idle_func_object_ops_nopar;
-       workers_finished = FALSE;
+       context->idle_func_object_ops = (context->active_workers_num > 1) ? context->idle_func_object_ops_par : context->idle_func_object_ops_nopar;
+       context->workers_finished = FALSE;
 
-       for (i = 0; i < active_workers_num; i++) {
+       for (i = 0; i < context->active_workers_num; i++) {
                State old_state;
                gboolean did_set_state;
 
                do {
-                       old_state = workers_data [i].state;
+                       old_state = context->workers_data [i].state;
 
                        if (old_state == STATE_WORK_ENQUEUED)
                                break;
 
-                       did_set_state = set_state (&workers_data [i], old_state, STATE_WORK_ENQUEUED);
+                       did_set_state = set_state (&context->workers_data [i], old_state, STATE_WORK_ENQUEUED);
                } while (!did_set_state);
 
                if (!state_is_working_or_enqueued (old_state))
@@ -131,7 +101,7 @@ sgen_workers_ensure_awake (void)
        }
 
        if (need_signal)
-               sgen_thread_pool_idle_signal (pool);
+               sgen_thread_pool_idle_signal (context->thread_pool_context);
 }
 
 static void
@@ -139,27 +109,28 @@ worker_try_finish (WorkerData *data)
 {
        State old_state;
        int i, working = 0;
+       WorkerContext *context = data->context;
 
        ++stat_workers_num_finished;
 
-       mono_os_mutex_lock (&finished_lock);
+       mono_os_mutex_lock (&context->finished_lock);
 
-       for (i = 0; i < active_workers_num; i++) {
-               if (state_is_working_or_enqueued (workers_data [i].state))
+       for (i = 0; i < context->active_workers_num; i++) {
+               if (state_is_working_or_enqueued (context->workers_data [i].state))
                        working++;
        }
 
        if (working == 1) {
-               SgenWorkersFinishCallback callback = finish_callback;
-               SGEN_ASSERT (0, idle_func_object_ops == idle_func_object_ops_nopar, "Why are we finishing with parallel context");
+               SgenWorkersFinishCallback callback = context->finish_callback;
+               SGEN_ASSERT (0, context->idle_func_object_ops == context->idle_func_object_ops_nopar, "Why are we finishing with parallel context");
                /* We are the last one left. Enqueue preclean job if we have one and awake everybody */
                SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
                if (callback) {
-                       finish_callback = NULL;
+                       context->finish_callback = NULL;
                        callback ();
-                       worker_awakenings = 0;
+                       context->worker_awakenings = 0;
                        /* Make sure each worker has a chance of seeing the enqueued jobs */
-                       sgen_workers_ensure_awake ();
+                       sgen_workers_ensure_awake (context);
                        SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED");
                        goto work_available;
                }
@@ -180,22 +151,22 @@ worker_try_finish (WorkerData *data)
         * of performance as non-parallel mode even if we fail to distribute work properly.
         */
        if (working == 2)
-               idle_func_object_ops = idle_func_object_ops_nopar;
+               context->idle_func_object_ops = context->idle_func_object_ops_nopar;
 
-       workers_finished = TRUE;
-       mono_os_mutex_unlock (&finished_lock);
+       context->workers_finished = TRUE;
+       mono_os_mutex_unlock (&context->finished_lock);
 
-       binary_protocol_worker_finish (sgen_timestamp (), forced_stop);
+       binary_protocol_worker_finish (sgen_timestamp (), context->forced_stop);
 
        sgen_gray_object_queue_trim_free_list (&data->private_gray_queue);
        return;
 
 work_available:
-       mono_os_mutex_unlock (&finished_lock);
+       mono_os_mutex_unlock (&context->finished_lock);
 }
 
 void
-sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue)
+sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue)
 {
        if (!enqueue) {
                job->func (NULL, job);
@@ -203,7 +174,7 @@ sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue)
                return;
        }
 
-       sgen_thread_pool_job_enqueue (pool, job);
+       sgen_thread_pool_job_enqueue (worker_contexts [generation].thread_pool_context, job);
 }
 
 static gboolean
@@ -216,7 +187,7 @@ workers_get_work (WorkerData *data)
        g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
        g_assert (major->is_concurrent || minor->is_parallel);
 
-       section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
+       section = sgen_section_gray_queue_dequeue (&data->context->workers_distribute_gray_queue);
        if (section) {
                sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel);
                return TRUE;
@@ -234,6 +205,7 @@ workers_steal_work (WorkerData *data)
        SgenMinorCollector *minor = sgen_get_minor_collector ();
        int generation = sgen_get_current_collection_generation ();
        GrayQueueSection *section = NULL;
+       WorkerContext *context = data->context;
        int i, current_worker;
 
        if ((generation == GENERATION_OLD && !major->is_parallel) ||
@@ -243,12 +215,12 @@ workers_steal_work (WorkerData *data)
        /* If we're parallel, steal from other workers' private gray queues  */
        g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
 
-       current_worker = (int) (data - workers_data);
+       current_worker = (int) (data - context->workers_data);
 
-       for (i = 1; i < active_workers_num && !section; i++) {
-               int steal_worker = (current_worker + i) % active_workers_num;
-               if (state_is_working_or_enqueued (workers_data [steal_worker].state))
-                       section = sgen_gray_object_steal_section (&workers_data [steal_worker].private_gray_queue);
+       for (i = 1; i < context->active_workers_num && !section; i++) {
+               int steal_worker = (current_worker + i) % context->active_workers_num;
+               if (state_is_working_or_enqueued (context->workers_data [steal_worker].state))
+                       section = sgen_gray_object_steal_section (&context->workers_data [steal_worker].private_gray_queue);
        }
 
        if (section) {
@@ -284,64 +256,81 @@ thread_pool_init_func (void *data_untyped)
        SgenMajorCollector *major = sgen_get_major_collector ();
        SgenMinorCollector *minor = sgen_get_minor_collector ();
 
-       sgen_client_thread_register_worker ();
-
        if (!major->is_concurrent && !minor->is_parallel)
                return;
 
        init_private_gray_queue (data);
 
-       if (worker_init_cb)
-               worker_init_cb (data);
+       /* Separate WorkerData for same thread share free_block_lists */
+       if (major->is_parallel || minor->is_parallel)
+               major->init_block_free_lists (&data->free_block_lists);
 }
 
 static gboolean
-continue_idle_func (void *data_untyped)
+sgen_workers_are_working (WorkerContext *context)
 {
-       if (data_untyped) {
-               WorkerData *data = (WorkerData *)data_untyped;
-               return state_is_working_or_enqueued (data->state);
-       } else {
-               /* Return if any of the threads is working */
-               return !sgen_workers_all_done ();
+       int i;
+
+       for (i = 0; i < context->active_workers_num; i++) {
+               if (state_is_working_or_enqueued (context->workers_data [i].state))
+                       return TRUE;
        }
+       return FALSE;
+}
+
+static gboolean
+continue_idle_func (void *data_untyped, int thread_pool_context)
+{
+       if (data_untyped)
+               return state_is_working_or_enqueued (((WorkerData*)data_untyped)->state);
+
+       /* Return if any of the threads is working in the context */
+       if (worker_contexts [GENERATION_NURSERY].workers_num && worker_contexts [GENERATION_NURSERY].thread_pool_context == thread_pool_context)
+               return sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]);
+       if (worker_contexts [GENERATION_OLD].workers_num && worker_contexts [GENERATION_OLD].thread_pool_context == thread_pool_context)
+               return sgen_workers_are_working (&worker_contexts [GENERATION_OLD]);
+
+       g_assert_not_reached ();
+       return FALSE;
 }
 
 static gboolean
 should_work_func (void *data_untyped)
 {
        WorkerData *data = (WorkerData*)data_untyped;
-       int current_worker = (int) (data - workers_data);
+       WorkerContext *context = data->context;
+       int current_worker = (int) (data - context->workers_data);
 
-       return started && current_worker < active_workers_num && state_is_working_or_enqueued (data->state);
+       return context->started && current_worker < context->active_workers_num && state_is_working_or_enqueued (data->state);
 }
 
 static void
 marker_idle_func (void *data_untyped)
 {
        WorkerData *data = (WorkerData *)data_untyped;
+       WorkerContext *context = data->context;
 
-       SGEN_ASSERT (0, continue_idle_func (data_untyped), "Why are we called when we're not supposed to work?");
+       SGEN_ASSERT (0, continue_idle_func (data_untyped, context->thread_pool_context), "Why are we called when we're not supposed to work?");
 
        if (data->state == STATE_WORK_ENQUEUED) {
                set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING);
                SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?");
        }
 
-       if (!forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
-               ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (idle_func_object_ops, &data->private_gray_queue);
+       if (!context->forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
+               ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (context->idle_func_object_ops, &data->private_gray_queue);
 
                SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
 
                sgen_drain_gray_stack (ctx);
 
                if (data->private_gray_queue.num_sections >= SGEN_WORKER_MIN_SECTIONS_SIGNAL
-                               && workers_finished && worker_awakenings < active_workers_num) {
+                               && context->workers_finished && context->worker_awakenings < context->active_workers_num) {
                        /* We bound the number of worker awakenings just to be sure */
-                       worker_awakenings++;
-                       mono_os_mutex_lock (&finished_lock);
-                       sgen_workers_ensure_awake ();
-                       mono_os_mutex_unlock (&finished_lock);
+                       context->worker_awakenings++;
+                       mono_os_mutex_lock (&context->finished_lock);
+                       sgen_workers_ensure_awake (context);
+                       mono_os_mutex_unlock (&context->finished_lock);
                }
        } else {
                worker_try_finish (data);
@@ -349,110 +338,122 @@ marker_idle_func (void *data_untyped)
 }
 
 static void
-init_distribute_gray_queue (void)
+init_distribute_gray_queue (WorkerContext *context)
 {
-       sgen_section_gray_queue_init (&workers_distribute_gray_queue, TRUE,
+       sgen_section_gray_queue_init (&context->workers_distribute_gray_queue, TRUE,
                        sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
 }
 
 void
-sgen_workers_init (int num_workers, SgenWorkerCallback callback)
+sgen_workers_create_context (int generation, int num_workers)
 {
+       static gboolean stat_inited = FALSE;
        int i;
-       WorkerData **workers_data_ptrs = (WorkerData**)alloca(num_workers * sizeof(WorkerData*));
+       WorkerData **workers_data_ptrs = (WorkerData**)sgen_alloc_internal_dynamic (num_workers * sizeof(WorkerData*), INTERNAL_MEM_WORKER_DATA, TRUE);
+       WorkerContext *context = &worker_contexts [generation];
 
-       mono_os_mutex_init (&finished_lock);
-       //g_print ("initing %d workers\n", num_workers);
+       SGEN_ASSERT (0, !context->workers_num, "We can't init the worker context for a generation twice");
 
-       workers_num = num_workers;
-       active_workers_num = num_workers;
+       mono_os_mutex_init (&context->finished_lock);
 
-       workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
-       memset (workers_data, 0, sizeof (WorkerData) * num_workers);
+       context->generation = generation;
+       context->workers_num = num_workers;
+       context->active_workers_num = num_workers;
 
-       init_distribute_gray_queue ();
+       context->workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
+       memset (context->workers_data, 0, sizeof (WorkerData) * num_workers);
 
-       for (i = 0; i < num_workers; ++i)
-               workers_data_ptrs [i] = &workers_data [i];
+       init_distribute_gray_queue (context);
 
-       worker_init_cb = callback;
+       for (i = 0; i < num_workers; ++i) {
+               workers_data_ptrs [i] = &context->workers_data [i];
+               context->workers_data [i].context = context;
+       }
 
-       pool = &pool_inst;
-       sgen_thread_pool_init (pool, num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (SgenThreadPoolData**)workers_data_ptrs);
+       context->thread_pool_context = sgen_thread_pool_create_context (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (void**)workers_data_ptrs);
 
-       mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
+       if (!stat_inited) {
+               mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
+               stat_inited = TRUE;
+       }
 }
 
 void
-sgen_workers_shutdown (void)
+sgen_workers_stop_all_workers (int generation)
 {
-       if (pool)
-               sgen_thread_pool_shutdown (pool);
-}
+       WorkerContext *context = &worker_contexts [generation];
 
-void
-sgen_workers_stop_all_workers (void)
-{
-       finish_callback = NULL;
+       context->finish_callback = NULL;
        mono_memory_write_barrier ();
-       forced_stop = TRUE;
+       context->forced_stop = TRUE;
 
-       sgen_thread_pool_wait_for_all_jobs (pool);
-       sgen_thread_pool_idle_wait (pool);
-       SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
+       sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
+       sgen_thread_pool_idle_wait (context->thread_pool_context);
+       SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
 
-       started = FALSE;
+       context->started = FALSE;
 }
 
 void
-sgen_workers_set_num_active_workers (int num_workers)
+sgen_workers_set_num_active_workers (int generation, int num_workers)
 {
+       WorkerContext *context = &worker_contexts [generation];
        if (num_workers) {
-               SGEN_ASSERT (0, active_workers_num <= workers_num, "We can't start more workers than we initialized");
-               active_workers_num = num_workers;
+               SGEN_ASSERT (0, num_workers <= context->workers_num, "We can't start more workers than we initialized");
+               context->active_workers_num = num_workers;
        } else {
-               active_workers_num = workers_num;
+               context->active_workers_num = context->workers_num;
        }
 }
 
 void
-sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
+sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
 {
-       SGEN_ASSERT (0, !started, "Why are we starting to work without finishing previous cycle");
-
-       idle_func_object_ops_par = object_ops_par;
-       idle_func_object_ops_nopar = object_ops_nopar;
-       forced_stop = FALSE;
-       finish_callback = callback;
-       worker_awakenings = 0;
-       started = TRUE;
+       WorkerContext *context = &worker_contexts [generation];
+       SGEN_ASSERT (0, !context->started, "Why are we starting to work without finishing previous cycle");
+
+       context->idle_func_object_ops_par = object_ops_par;
+       context->idle_func_object_ops_nopar = object_ops_nopar;
+       context->forced_stop = FALSE;
+       context->finish_callback = callback;
+       context->worker_awakenings = 0;
+       context->started = TRUE;
        mono_memory_write_barrier ();
 
        /*
         * We expect workers to start finishing only after all of them were awaken.
         * Otherwise we might think that we have fewer workers and use wrong context.
         */
-       mono_os_mutex_lock (&finished_lock);
-       sgen_workers_ensure_awake ();
-       mono_os_mutex_unlock (&finished_lock);
+       mono_os_mutex_lock (&context->finished_lock);
+       sgen_workers_ensure_awake (context);
+       mono_os_mutex_unlock (&context->finished_lock);
 }
 
 void
-sgen_workers_join (void)
+sgen_workers_join (int generation)
 {
+       WorkerContext *context = &worker_contexts [generation];
        int i;
 
-       sgen_thread_pool_wait_for_all_jobs (pool);
-       sgen_thread_pool_idle_wait (pool);
-       SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
+       /*
+        * It might be the case that a worker didn't get to run anything
+        * in this context, because it was stuck working on a long job
+        * in another context. In this case its state is active (WORK_ENQUEUED)
+        * and we need to wait for it to finish itself.
+        * FIXME Avoid having to wait for the worker to report its own finish.
+        */
+
+       sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
+       sgen_thread_pool_idle_wait (context->thread_pool_context);
+       SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
 
        /* At this point all the workers have stopped. */
 
-       SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is there still work left to do?");
-       for (i = 0; i < active_workers_num; ++i)
-               SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue), "Why is there still work left to do?");
+       SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue), "Why is there still work left to do?");
+       for (i = 0; i < context->active_workers_num; ++i)
+               SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue), "Why is there still work left to do?");
 
-       started = FALSE;
+       context->started = FALSE;
 }
 
 /*
@@ -460,17 +461,18 @@ sgen_workers_join (void)
  * If we're stopped, there are also no pending jobs.
  */
 gboolean
-sgen_workers_have_idle_work (void)
+sgen_workers_have_idle_work (int generation)
 {
+       WorkerContext *context = &worker_contexts [generation];
        int i;
 
-       SGEN_ASSERT (0, forced_stop && sgen_workers_all_done (), "Checking for idle work should only happen if the workers are stopped.");
+       SGEN_ASSERT (0, context->forced_stop && !sgen_workers_are_working (context), "Checking for idle work should only happen if the workers are stopped.");
 
-       if (!sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
+       if (!sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue))
                return TRUE;
 
-       for (i = 0; i < active_workers_num; ++i) {
-               if (!sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue))
+       for (i = 0; i < context->active_workers_num; ++i) {
+               if (!sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue))
                        return TRUE;
        }
 
@@ -480,48 +482,42 @@ sgen_workers_have_idle_work (void)
 gboolean
 sgen_workers_all_done (void)
 {
-       int i;
+       if (worker_contexts [GENERATION_NURSERY].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]))
+               return FALSE;
+       if (worker_contexts [GENERATION_OLD].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_OLD]))
+               return FALSE;
 
-       for (i = 0; i < active_workers_num; i++) {
-               if (state_is_working_or_enqueued (workers_data [i].state))
-                       return FALSE;
-       }
        return TRUE;
 }
 
-/* Must only be used for debugging */
-gboolean
-sgen_workers_are_working (void)
-{
-       return !sgen_workers_all_done ();
-}
-
 void
-sgen_workers_assert_gray_queue_is_empty (void)
+sgen_workers_assert_gray_queue_is_empty (int generation)
 {
-       SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
+       SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&worker_contexts [generation].workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
 }
 
 void
-sgen_workers_take_from_queue (SgenGrayQueue *queue)
+sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue)
 {
-       sgen_gray_object_spread (queue, sgen_workers_get_job_split_count ());
+       WorkerContext *context = &worker_contexts [generation];
+
+       sgen_gray_object_spread (queue, sgen_workers_get_job_split_count (generation));
 
        for (;;) {
                GrayQueueSection *section = sgen_gray_object_dequeue_section (queue);
                if (!section)
                        break;
-               sgen_section_gray_queue_enqueue (&workers_distribute_gray_queue, section);
+               sgen_section_gray_queue_enqueue (&context->workers_distribute_gray_queue, section);
        }
 
-       SGEN_ASSERT (0, !sgen_workers_are_working (), "We should fully populate the distribute gray queue before we start the workers");
+       SGEN_ASSERT (0, !sgen_workers_are_working (context), "We should fully populate the distribute gray queue before we start the workers");
 }
 
 SgenObjectOperations*
-sgen_workers_get_idle_func_object_ops (void)
+sgen_workers_get_idle_func_object_ops (WorkerData *worker)
 {
-       g_assert (idle_func_object_ops);
-       return idle_func_object_ops;
+       g_assert (worker->context->idle_func_object_ops);
+       return worker->context->idle_func_object_ops;
 }
 
 /*
@@ -529,28 +525,29 @@ sgen_workers_get_idle_func_object_ops (void)
  * more than one worker, we split into a larger number of jobs so that, in case
  * the work load is uneven, a worker that finished quickly can take up more jobs
  * than another one.
+ *
+ * We also return 1 if there is no worker context for that generation.
  */
 int
-sgen_workers_get_job_split_count (void)
+sgen_workers_get_job_split_count (int generation)
 {
-       return (active_workers_num > 1) ? active_workers_num * 4 : 1;
+       return (worker_contexts [generation].active_workers_num > 1) ? worker_contexts [generation].active_workers_num * 4 : 1;
 }
 
 void
-sgen_workers_foreach (SgenWorkerCallback callback)
+sgen_workers_foreach (int generation, SgenWorkerCallback callback)
 {
+       WorkerContext *context = &worker_contexts [generation];
        int i;
 
-       for (i = 0; i < workers_num; i++)
-               callback (&workers_data [i]);
+       for (i = 0; i < context->workers_num; i++)
+               callback (&context->workers_data [i]);
 }
 
 gboolean
 sgen_workers_is_worker_thread (MonoNativeThreadId id)
 {
-       if (!pool)
-               return FALSE;
-       return sgen_thread_pool_is_thread_pool_thread (pool, id);
+       return sgen_thread_pool_is_thread_pool_thread (id);
 }
 
 #endif
index 78dea19867d645d76b7fe73dfcf81c04981da7a2..42dc16ac4aef7c6eb18cd5fe0b73dd5f49b6497e 100644 (file)
 #include "mono/sgen/sgen-thread-pool.h"
 
 typedef struct _WorkerData WorkerData;
+typedef struct _WorkerContext WorkerContext;
+
+typedef gint32 State;
+
+typedef void (*SgenWorkersFinishCallback) (void);
+typedef void (*SgenWorkerCallback) (WorkerData *data);
+
 struct _WorkerData {
-       /*
-        * Threadpool threads receive as their starting argument a WorkerData.
-        * tp_data is meant for use inside the sgen thread pool and must be first.
-        */
-       SgenThreadPoolData tp_data;
        gint32 state;
        SgenGrayQueue private_gray_queue; /* only read/written by worker thread */
        /*
@@ -29,29 +31,54 @@ struct _WorkerData {
         * starts.
         */
        gpointer free_block_lists;
+       WorkerContext *context;
 };
 
-typedef void (*SgenWorkersFinishCallback) (void);
-typedef void (*SgenWorkerCallback) (WorkerData *data);
+struct _WorkerContext {
+       int workers_num;
+       int active_workers_num;
+       volatile gboolean started;
+       volatile gboolean forced_stop;
+       WorkerData *workers_data;
+
+       /*
+        * When using multiple workers, we need to have the last worker
+        * enqueue the preclean jobs (if there are any). This lock ensures
+        * that when the last worker takes it, all the other workers have
+        * gracefully finished, so it can restart them.
+        */
+       mono_mutex_t finished_lock;
+       volatile gboolean workers_finished;
+       int worker_awakenings;
+
+       SgenSectionGrayQueue workers_distribute_gray_queue;
+
+       SgenObjectOperations * volatile idle_func_object_ops;
+       SgenObjectOperations *idle_func_object_ops_par, *idle_func_object_ops_nopar;
+
+       /*
+        * finished_callback is called only when the workers finish work normally (when they
+        * are not forced to finish). The callback is used to enqueue preclean jobs.
+        */
+       volatile SgenWorkersFinishCallback finish_callback;
+
+       int generation;
+       int thread_pool_context;
+};
 
-void sgen_workers_init (int num_workers, SgenWorkerCallback callback);
-void sgen_workers_shutdown (void);
-void sgen_workers_stop_all_workers (void);
-void sgen_workers_set_num_active_workers (int num_workers);
-void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
-void sgen_workers_init_distribute_gray_queue (void);
-void sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue);
-void sgen_workers_distribute_gray_queue_sections (void);
-void sgen_workers_reset_data (void);
-void sgen_workers_join (void);
-gboolean sgen_workers_have_idle_work (void);
+void sgen_workers_create_context (int generation, int num_workers);
+void sgen_workers_stop_all_workers (int generation);
+void sgen_workers_set_num_active_workers (int generation, int num_workers);
+void sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
+void sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue);
+void sgen_workers_join (int generation);
+gboolean sgen_workers_have_idle_work (int generation);
 gboolean sgen_workers_all_done (void);
-gboolean sgen_workers_are_working (void);
-void sgen_workers_assert_gray_queue_is_empty (void);
-void sgen_workers_take_from_queue (SgenGrayQueue *queue);
-SgenObjectOperations* sgen_workers_get_idle_func_object_ops (void);
-int sgen_workers_get_job_split_count (void);
-void sgen_workers_foreach (SgenWorkerCallback callback);
+void sgen_workers_assert_gray_queue_is_empty (int generation);
+void sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue);
+SgenObjectOperations* sgen_workers_get_idle_func_object_ops (WorkerData *worker);
+int sgen_workers_get_job_split_count (int generation);
+void sgen_workers_foreach (int generation, SgenWorkerCallback callback);
 gboolean sgen_workers_is_worker_thread (MonoNativeThreadId id);
 
 #endif