[sgen] Split up concurrent sweep from worker logic
authorVlad Brezae <brezaevlad@gmail.com>
Wed, 29 Mar 2017 11:17:02 +0000 (14:17 +0300)
committerVlad Brezae <brezaevlad@gmail.com>
Tue, 4 Apr 2017 13:00:06 +0000 (16:00 +0300)
While this was ok because workers and sweep could not run at the same time, this is no longer the case with parallel nursery collections. In the future, if many threads will be required for different types of jobs, we can consider running them on same native thread, but the jobs will still need to have appropriate contexts (the SgenThreadPool they belong to).

mono/metadata/sgen-mono.c
mono/metadata/sgen-stw.c
mono/sgen/sgen-gc.c
mono/sgen/sgen-gc.h
mono/sgen/sgen-marksweep.c
mono/sgen/sgen-memory-governor.c
mono/sgen/sgen-protocol.c
mono/sgen/sgen-thread-pool.c
mono/sgen/sgen-thread-pool.h
mono/sgen/sgen-workers.c
mono/sgen/sgen-workers.h

index f2958e964be0ddb76c22b4ff4ebd7574d827c121..a67fc067263ca69b8714b0bb056b9c2fe4028e3d 100644 (file)
@@ -17,7 +17,7 @@
 #include "sgen/sgen-client.h"
 #include "sgen/sgen-cardtable.h"
 #include "sgen/sgen-pinning.h"
-#include "sgen/sgen-thread-pool.h"
+#include "sgen/sgen-workers.h"
 #include "metadata/marshal.h"
 #include "metadata/method-builder.h"
 #include "metadata/abi-details.h"
@@ -2069,7 +2069,7 @@ mono_sgen_register_moved_object (void *obj, void *destination)
         * lock-free data structure for the queue as multiple threads will be
         * adding to it at the same time.
         */
-       if (sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ())) {
+       if (sgen_workers_is_worker_thread (mono_native_thread_id_get ())) {
                sgen_pointer_queue_add (&moved_objects_queue, obj);
                sgen_pointer_queue_add (&moved_objects_queue, destination);
        } else {
@@ -3023,7 +3023,9 @@ mono_gc_base_init (void)
 void
 mono_gc_base_cleanup (void)
 {
-       sgen_thread_pool_shutdown ();
+       sgen_thread_pool_shutdown (major_collector.get_sweep_pool ());
+
+       sgen_workers_shutdown ();
 
        // We should have consumed any outstanding moves.
        g_assert (sgen_pointer_queue_is_empty (&moved_objects_queue));
index 8c1a234616f499e752b71233db8d038245a2debe..fc1de7c25f87aa9759f19d52f0741c0b911fd71f 100644 (file)
@@ -20,7 +20,7 @@
 #include "sgen/sgen-gc.h"
 #include "sgen/sgen-protocol.h"
 #include "sgen/sgen-memory-governor.h"
-#include "sgen/sgen-thread-pool.h"
+#include "sgen/sgen-workers.h"
 #include "metadata/profiler-private.h"
 #include "sgen/sgen-client.h"
 #include "metadata/sgen-bridge-internals.h"
@@ -227,7 +227,8 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info, int *reason)
        We can't suspend the workers that will do all the heavy lifting.
        FIXME Use some state bit in SgenThreadInfo for this.
        */
-       if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) {
+       if (sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), mono_thread_info_get_tid (info)) ||
+                       sgen_workers_is_worker_thread (mono_thread_info_get_tid (info))) {
                if (reason)
                        *reason = 4;
                return FALSE;
index ef8739400fcb67666b676c407136ff7d0afadcd9..91ee1a57a94bd657baea30276550653b62e6c140 100644 (file)
@@ -3408,7 +3408,7 @@ sgen_gc_init (void)
        if (major_collector.post_param_init)
                major_collector.post_param_init (&major_collector);
 
-       if (major_collector.needs_thread_pool || sgen_minor_collector.is_parallel) {
+       if (major_collector.is_concurrent || sgen_minor_collector.is_parallel) {
                int num_workers = 1;
                if (major_collector.is_parallel || sgen_minor_collector.is_parallel) {
                        /* FIXME Detect the number of physical cores, instead of logical */
index 4ff60166cca9d66ec79ea4e08ea3c306bf0d10be..3ebe2e4dfadae344d870426a5122248e7ffc983e 100644 (file)
@@ -35,6 +35,7 @@ typedef struct _SgenThreadInfo SgenThreadInfo;
 #include "mono/sgen/sgen-hash-table.h"
 #include "mono/sgen/sgen-protocol.h"
 #include "mono/sgen/gc-internal-agnostic.h"
+#include "mono/sgen/sgen-thread-pool.h"
 
 /* The method used to clear the nursery */
 /* Clearing at nursery collections is the safest, but has bad interactions with caches.
@@ -593,7 +594,7 @@ sgen_update_reference (GCObject **p, GCObject *o, gboolean allow_null)
 {
        if (!allow_null)
                SGEN_ASSERT (0, o, "Cannot update a reference with a NULL pointer");
-       SGEN_ASSERT (0, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread");
+       SGEN_ASSERT (0, !sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "Can't update a reference in the worker thread");
        *p = o;
 }
 
@@ -636,7 +637,6 @@ struct _SgenMajorCollector {
        size_t section_size;
        gboolean is_concurrent;
        gboolean is_parallel;
-       gboolean needs_thread_pool;
        gboolean supports_cardtable;
        gboolean sweeps_lazily;
 
@@ -693,6 +693,7 @@ struct _SgenMajorCollector {
        guint8* (*get_cardtable_mod_union_for_reference) (char *object);
        long long (*get_and_reset_num_major_objects_marked) (void);
        void (*count_cards) (long long *num_total_cards, long long *num_marked_cards);
+       SgenThreadPool* (*get_sweep_pool) (void);
 
        void (*worker_init_cb) (gpointer worker);
 };
index 365f64787812f03981c64145cd51e38b01664d7a..04ab32dab84a0dbc96e64bf00e7293ed6eee2e8d 100644 (file)
@@ -187,6 +187,9 @@ static volatile int sweep_state = SWEEP_STATE_SWEPT;
 static gboolean concurrent_mark;
 static gboolean concurrent_sweep = TRUE;
 
+SgenThreadPool sweep_pool_inst;
+SgenThreadPool *sweep_pool;
+
 #define BLOCK_IS_TAGGED_HAS_REFERENCES(bl)     SGEN_POINTER_IS_TAGGED_1 ((bl))
 #define BLOCK_TAG_HAS_REFERENCES(bl)           SGEN_POINTER_TAG_1 ((bl))
 
@@ -918,7 +921,7 @@ major_finish_sweep_checking (void)
  wait:
        job = sweep_job;
        if (job)
-               sgen_thread_pool_job_wait (job);
+               sgen_thread_pool_job_wait (sweep_pool, job);
        SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?");
        SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?");
 }
@@ -1806,7 +1809,7 @@ sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job)
         */
        if (concurrent_sweep && lazy_sweep) {
                sweep_blocks_job = sgen_thread_pool_job_alloc ("sweep_blocks", sweep_blocks_job_func, sizeof (SgenThreadPoolJob));
-               sgen_thread_pool_job_enqueue (sweep_blocks_job);
+               sgen_thread_pool_job_enqueue (sweep_pool, sweep_blocks_job);
        }
 
        sweep_finish ();
@@ -1855,7 +1858,7 @@ major_sweep (void)
        SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?");
        if (concurrent_sweep) {
                sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob));
-               sgen_thread_pool_job_enqueue (sweep_job);
+               sgen_thread_pool_job_enqueue (sweep_pool, sweep_job);
        } else {
                sweep_job_func (NULL, NULL);
        }
@@ -2068,7 +2071,7 @@ major_start_major_collection (void)
                 */
                SgenThreadPoolJob *job = sweep_blocks_job;
                if (job)
-                       sgen_thread_pool_job_wait (job);
+                       sgen_thread_pool_job_wait (sweep_pool, job);
        }
 
        if (lazy_sweep && !concurrent_sweep)
@@ -2108,6 +2111,12 @@ major_finish_major_collection (ScannedObjectCounts *counts)
 #endif
 }
 
+static SgenThreadPool*
+major_get_sweep_pool (void)
+{
+       return sweep_pool;
+}
+
 static int
 compare_pointers (const void *va, const void *vb) {
        char *a = *(char**)va, *b = *(char**)vb;
@@ -2714,7 +2723,6 @@ static void
 post_param_init (SgenMajorCollector *collector)
 {
        collector->sweeps_lazily = lazy_sweep;
-       collector->needs_thread_pool = concurrent_mark || concurrent_sweep;
 }
 
 /* We are guaranteed to be called by the worker in question */
@@ -2733,6 +2741,12 @@ sgen_worker_init_callback (gpointer worker_untyped)
        mono_native_tls_set_value (worker_block_free_list_key, worker_free_blocks);
 }
 
+static void
+thread_pool_init_func (void *data_untyped)
+{
+       sgen_client_thread_register_worker ();
+}
+
 static void
 sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurrent, gboolean is_parallel)
 {
@@ -2788,7 +2802,6 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        concurrent_mark = is_concurrent;
        collector->is_concurrent = is_concurrent;
        collector->is_parallel = is_parallel;
-       collector->needs_thread_pool = is_concurrent || concurrent_sweep;
        collector->get_and_reset_num_major_objects_marked = major_get_and_reset_num_major_objects_marked;
        collector->supports_cardtable = TRUE;
 
@@ -2834,6 +2847,7 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
        collector->is_valid_object = major_is_valid_object;
        collector->describe_pointer = major_describe_pointer;
        collector->count_cards = major_count_cards;
+       collector->get_sweep_pool = major_get_sweep_pool;
 
        collector->major_ops_serial.copy_or_mark_object = major_copy_or_mark_object_canonical;
        collector->major_ops_serial.scan_object = major_scan_object_with_evacuation;
@@ -2893,6 +2907,12 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
 
        /*cardtable requires major pages to be 8 cards aligned*/
        g_assert ((MS_BLOCK_SIZE % (8 * CARD_SIZE_IN_BYTES)) == 0);
+
+       if (concurrent_sweep) {
+               SgenThreadPool **thread_datas = &sweep_pool;
+               sweep_pool = &sweep_pool_inst;
+               sgen_thread_pool_init (sweep_pool, 1, thread_pool_init_func, NULL, NULL, NULL, (SgenThreadPoolData**)&thread_datas);
+       }
 }
 
 void
index 8dae5a062fd6ddcc54ee582cd161624449ee6a11..3bf90115b854910c743596a6e32c1a061e35d734 100644 (file)
@@ -20,7 +20,7 @@
 
 #include "mono/sgen/sgen-gc.h"
 #include "mono/sgen/sgen-memory-governor.h"
-#include "mono/sgen/sgen-thread-pool.h"
+#include "mono/sgen/sgen-workers.h"
 #include "mono/sgen/sgen-client.h"
 
 #define MIN_MINOR_COLLECTION_ALLOWANCE ((mword)(DEFAULT_NURSERY_SIZE * default_allowance_nursery_size_ratio))
@@ -459,7 +459,7 @@ gboolean
 sgen_memgov_try_alloc_space (mword size, int space)
 {
        if (sgen_memgov_available_free_space () < size) {
-               SGEN_ASSERT (4, !sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread");
+               SGEN_ASSERT (4, !sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "Memory shouldn't run out in worker thread");
                return FALSE;
        }
 
index 419e90a482a50faa961382308edd80844b743a17..f86b05e51c963dd4de1f6c5dba3728732db8b97a 100644 (file)
@@ -16,7 +16,7 @@
 #include "sgen-gc.h"
 #include "sgen-protocol.h"
 #include "sgen-memory-governor.h"
-#include "sgen-thread-pool.h"
+#include "sgen-workers.h"
 #include "sgen-client.h"
 #include "mono/utils/mono-membar.h"
 #include "mono/utils/mono-proclib.h"
@@ -365,11 +365,17 @@ protocol_entry (unsigned char type, gpointer data, int size)
        buffer->buffer [index++] = type;
        /* We should never change the header format */
        if (include_worker_index) {
+               int worker_index;
+               MonoNativeThreadId tid = mono_native_thread_id_get ();
                /*
                 * If the thread is not a worker thread we insert 0, which is interpreted
                 * as gc thread. Worker indexes are 1 based.
                 */
-               buffer->buffer [index++] = (unsigned char) sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ());
+               worker_index = sgen_workers_is_worker_thread (tid);
+               if (!worker_index)
+                       worker_index = sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), tid);
+               /* FIXME Consider using different index bases for different thread pools */
+               buffer->buffer [index++] = (unsigned char) worker_index;
        }
        memcpy (buffer->buffer + index, data, size);
        index += size;
index f2aef3ae02363d6ccb3d107fce8f8a7f39d38cbb..a7abfad24472822f85e7f51632aa998cb913f05c 100644 (file)
 
 #include "mono/sgen/sgen-gc.h"
 #include "mono/sgen/sgen-thread-pool.h"
-#include "mono/sgen/sgen-pointer-queue.h"
 #include "mono/utils/mono-os-mutex.h"
-#ifndef SGEN_WITHOUT_MONO
-#include "mono/utils/mono-threads.h"
-#endif
-
-#define MAX_NUM_THREADS 8
-
-static mono_mutex_t lock;
-static mono_cond_t work_cond;
-static mono_cond_t done_cond;
-
-static int threads_num = 0;
-static MonoNativeThreadId threads [MAX_NUM_THREADS];
-
-/* Only accessed with the lock held. */
-static SgenPointerQueue job_queue;
-
-static SgenThreadPoolThreadInitFunc thread_init_func;
-static SgenThreadPoolIdleJobFunc idle_job_func;
-static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
-static SgenThreadPoolShouldWorkFunc should_work_func;
-
-static volatile gboolean threadpool_shutdown;
-static volatile int threads_finished = 0;
 
 enum {
        STATE_WAITING,
@@ -46,10 +22,10 @@ enum {
 
 /* Assumes that the lock is held. */
 static SgenThreadPoolJob*
-get_job_and_set_in_progress (void)
+get_job_and_set_in_progress (SgenThreadPool *pool)
 {
-       for (size_t i = 0; i < job_queue.next_slot; ++i) {
-               SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
+       for (size_t i = 0; i < pool->job_queue.next_slot; ++i) {
+               SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i];
                if (job->state == STATE_WAITING) {
                        job->state = STATE_IN_PROGRESS;
                        return job;
@@ -60,10 +36,10 @@ get_job_and_set_in_progress (void)
 
 /* Assumes that the lock is held. */
 static ssize_t
-find_job_in_queue (SgenThreadPoolJob *job)
+find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
 {
-       for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
-               if (job_queue.data [i] == job)
+       for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) {
+               if (pool->job_queue.data [i] == job)
                        return i;
        }
        return -1;
@@ -71,45 +47,47 @@ find_job_in_queue (SgenThreadPoolJob *job)
 
 /* Assumes that the lock is held. */
 static void
-remove_job (SgenThreadPoolJob *job)
+remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job)
 {
        ssize_t index;
        SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
-       index = find_job_in_queue (job);
+       index = find_job_in_queue (pool, job);
        SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
-       job_queue.data [index] = NULL;
-       sgen_pointer_queue_remove_nulls (&job_queue);
+       pool->job_queue.data [index] = NULL;
+       sgen_pointer_queue_remove_nulls (&pool->job_queue);
        sgen_thread_pool_job_free (job);
 }
 
 static gboolean
-continue_idle_job (void *thread_data)
+continue_idle_job (SgenThreadPool *pool, void *thread_data)
 {
-       if (!continue_idle_job_func)
+       if (!pool->continue_idle_job_func)
                return FALSE;
-       return continue_idle_job_func (thread_data);
+       return pool->continue_idle_job_func (thread_data);
 }
 
 static gboolean
-should_work (void *thread_data)
+should_work (SgenThreadPool *pool, void *thread_data)
 {
-       if (!should_work_func)
+       if (!pool->should_work_func)
                return TRUE;
-       return should_work_func (thread_data);
+       return pool->should_work_func (thread_data);
 }
 
 static mono_native_thread_return_t
-thread_func (void *thread_data)
+thread_func (SgenThreadPoolData *thread_data)
 {
-       thread_init_func (thread_data);
+       SgenThreadPool *pool = thread_data->pool;
+
+       pool->thread_init_func (thread_data);
 
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
        for (;;) {
                gboolean do_idle;
                SgenThreadPoolJob *job;
 
-               if (!should_work (thread_data) && !threadpool_shutdown) {
-                       mono_os_cond_wait (&work_cond, &lock);
+               if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) {
+                       mono_os_cond_wait (&pool->work_cond, &pool->lock);
                        continue;
                }
                /*
@@ -118,51 +96,51 @@ thread_func (void *thread_data)
                 * main thread might then set continue idle and signal us before we can take
                 * the lock, and we'd lose the signal.
                 */
-               do_idle = continue_idle_job (thread_data);
-               job = get_job_and_set_in_progress ();
+               do_idle = continue_idle_job (pool, thread_data);
+               job = get_job_and_set_in_progress (pool);
 
-               if (!job && !do_idle && !threadpool_shutdown) {
+               if (!job && !do_idle && !pool->threadpool_shutdown) {
                        /*
                         * pthread_cond_wait() can return successfully despite the condition
                         * not being signalled, so we have to run this in a loop until we
                         * really have work to do.
                         */
-                       mono_os_cond_wait (&work_cond, &lock);
+                       mono_os_cond_wait (&pool->work_cond, &pool->lock);
                        continue;
                }
 
-               mono_os_mutex_unlock (&lock);
+               mono_os_mutex_unlock (&pool->lock);
 
                if (job) {
                        job->func (thread_data, job);
 
-                       mono_os_mutex_lock (&lock);
+                       mono_os_mutex_lock (&pool->lock);
 
                        SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
                        job->state = STATE_DONE;
-                       remove_job (job);
+                       remove_job (pool, job);
                        /*
                         * Only the main GC thread will ever wait on the done condition, so we don't
                         * have to broadcast.
                         */
-                       mono_os_cond_signal (&done_cond);
+                       mono_os_cond_signal (&pool->done_cond);
                } else if (do_idle) {
-                       SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
+                       SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?");
                        do {
-                               idle_job_func (thread_data);
-                               do_idle = continue_idle_job (thread_data);
-                       } while (do_idle && !job_queue.next_slot);
+                               pool->idle_job_func (thread_data);
+                               do_idle = continue_idle_job (pool, thread_data);
+                       } while (do_idle && !pool->job_queue.next_slot);
 
-                       mono_os_mutex_lock (&lock);
+                       mono_os_mutex_lock (&pool->lock);
 
                        if (!do_idle)
-                               mono_os_cond_signal (&done_cond);
+                               mono_os_cond_signal (&pool->done_cond);
                } else {
-                       SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
-                       mono_os_mutex_lock (&lock);
-                       threads_finished++;
-                       mono_os_cond_signal (&done_cond);
-                       mono_os_mutex_unlock (&lock);
+                       SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
+                       mono_os_mutex_lock (&pool->lock);
+                       pool->threads_finished++;
+                       mono_os_cond_signal (&pool->done_cond);
+                       mono_os_mutex_unlock (&pool->lock);
                        return 0;
                }
        }
@@ -171,41 +149,49 @@ thread_func (void *thread_data)
 }
 
 void
-sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas)
+sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas)
 {
        int i;
 
-       threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
+       SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?");
+
+       pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
 
-       mono_os_mutex_init (&lock);
-       mono_os_cond_init (&work_cond);
-       mono_os_cond_init (&done_cond);
+       mono_os_mutex_init (&pool->lock);
+       mono_os_cond_init (&pool->work_cond);
+       mono_os_cond_init (&pool->done_cond);
 
-       thread_init_func = init_func;
-       idle_job_func = idle_func;
-       continue_idle_job_func = continue_idle_func;
-       should_work_func = should_work_func_p;
+       pool->thread_init_func = init_func;
+       pool->idle_job_func = idle_func;
+       pool->continue_idle_job_func = continue_idle_func;
+       pool->should_work_func = should_work_func_p;
 
-       for (i = 0; i < threads_num; i++)
-               mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
+       sgen_pointer_queue_init (&pool->job_queue, 0);
+       pool->threads_finished = 0;
+       pool->threadpool_shutdown = FALSE;
+
+       for (i = 0; i < pool->threads_num; i++) {
+               thread_datas [i]->pool = pool;
+               mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]);
+       }
 }
 
 void
-sgen_thread_pool_shutdown (void)
+sgen_thread_pool_shutdown (SgenThreadPool *pool)
 {
-       if (!threads_num)
+       if (!pool)
                return;
 
-       mono_os_mutex_lock (&lock);
-       threadpool_shutdown = TRUE;
-       mono_os_cond_broadcast (&work_cond);
-       while (threads_finished < threads_num)
-               mono_os_cond_wait (&done_cond, &lock);
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_lock (&pool->lock);
+       pool->threadpool_shutdown = TRUE;
+       mono_os_cond_broadcast (&pool->work_cond);
+       while (pool->threads_finished < pool->threads_num)
+               mono_os_cond_wait (&pool->done_cond, &pool->lock);
+       mono_os_mutex_unlock (&pool->lock);
 
-       mono_os_mutex_destroy (&lock);
-       mono_os_cond_destroy (&work_cond);
-       mono_os_cond_destroy (&done_cond);
+       mono_os_mutex_destroy (&pool->lock);
+       mono_os_cond_destroy (&pool->work_cond);
+       mono_os_cond_destroy (&pool->done_cond);
 }
 
 SgenThreadPoolJob*
@@ -226,74 +212,77 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job)
 }
 
 void
-sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
+sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job)
 {
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
 
-       sgen_pointer_queue_add (&job_queue, job);
-       mono_os_cond_signal (&work_cond);
+       sgen_pointer_queue_add (&pool->job_queue, job);
+       mono_os_cond_signal (&pool->work_cond);
 
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_unlock (&pool->lock);
 }
 
 void
-sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
+sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job)
 {
        SGEN_ASSERT (0, job, "Where's the job?");
 
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
 
-       while (find_job_in_queue (job) >= 0)
-               mono_os_cond_wait (&done_cond, &lock);
+       while (find_job_in_queue (pool, job) >= 0)
+               mono_os_cond_wait (&pool->done_cond, &pool->lock);
 
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_unlock (&pool->lock);
 }
 
 void
-sgen_thread_pool_idle_signal (void)
+sgen_thread_pool_idle_signal (SgenThreadPool *pool)
 {
-       SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
+       SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?");
 
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
 
-       if (continue_idle_job_func (NULL))
-               mono_os_cond_broadcast (&work_cond);
+       if (pool->continue_idle_job_func (NULL))
+               mono_os_cond_broadcast (&pool->work_cond);
 
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_unlock (&pool->lock);
 }
 
 void
-sgen_thread_pool_idle_wait (void)
+sgen_thread_pool_idle_wait (SgenThreadPool *pool)
 {
-       SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
+       SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?");
 
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
 
-       while (continue_idle_job_func (NULL))
-               mono_os_cond_wait (&done_cond, &lock);
+       while (pool->continue_idle_job_func (NULL))
+               mono_os_cond_wait (&pool->done_cond, &pool->lock);
 
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_unlock (&pool->lock);
 }
 
 void
-sgen_thread_pool_wait_for_all_jobs (void)
+sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool)
 {
-       mono_os_mutex_lock (&lock);
+       mono_os_mutex_lock (&pool->lock);
 
-       while (!sgen_pointer_queue_is_empty (&job_queue))
-               mono_os_cond_wait (&done_cond, &lock);
+       while (!sgen_pointer_queue_is_empty (&pool->job_queue))
+               mono_os_cond_wait (&pool->done_cond, &pool->lock);
 
-       mono_os_mutex_unlock (&lock);
+       mono_os_mutex_unlock (&pool->lock);
 }
 
 /* Return 0 if is not a thread pool thread or the thread number otherwise */
 int
-sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
+sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread)
 {
        int i;
 
-       for (i = 0; i < threads_num; i++) {
-               if (some_thread == threads [i])
+       if (!pool)
+               return 0;
+
+       for (i = 0; i < pool->threads_num; i++) {
+               if (some_thread == pool->threads [i])
                        return i + 1;
        }
 
index ce4bb11e321ee58668336c2a6989fa7ee71d4e6e..1b48e4433f040137bf37e0368f6b126b17a63c4e 100644 (file)
 #ifndef __MONO_SGEN_THREAD_POOL_H__
 #define __MONO_SGEN_THREAD_POOL_H__
 
+#include "mono/sgen/sgen-pointer-queue.h"
+#include "mono/utils/mono-threads.h"
+
 typedef struct _SgenThreadPoolJob SgenThreadPoolJob;
+typedef struct _SgenThreadPool SgenThreadPool;
+typedef struct _SgenThreadPoolData SgenThreadPoolData;
 
 typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job);
+typedef void (*SgenThreadPoolThreadInitFunc) (void*);
+typedef void (*SgenThreadPoolIdleJobFunc) (void*);
+typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
+typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*);
 
 struct _SgenThreadPoolJob {
        const char *name;
@@ -21,28 +30,49 @@ struct _SgenThreadPoolJob {
        volatile gint32 state;
 };
 
-typedef void (*SgenThreadPoolThreadInitFunc) (void*);
-typedef void (*SgenThreadPoolIdleJobFunc) (void*);
-typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
-typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*);
+#define MAX_NUM_THREADS 8
+
+struct _SgenThreadPool {
+       mono_mutex_t lock;
+       mono_cond_t work_cond;
+       mono_cond_t done_cond;
+
+       int threads_num;
+       MonoNativeThreadId threads [MAX_NUM_THREADS];
+
+       /* Only accessed with the lock held. */
+       SgenPointerQueue job_queue;
+
+       SgenThreadPoolThreadInitFunc thread_init_func;
+       SgenThreadPoolIdleJobFunc idle_job_func;
+       SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
+       SgenThreadPoolShouldWorkFunc should_work_func;
+
+       volatile gboolean threadpool_shutdown;
+       volatile int threads_finished;
+};
+
+struct _SgenThreadPoolData {
+       SgenThreadPool *pool;
+};
 
-void sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas);
+void sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, SgenThreadPoolData **thread_datas);
 
-void sgen_thread_pool_shutdown (void);
+void sgen_thread_pool_shutdown (SgenThreadPool *pool);
 
 SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size);
 /* This only needs to be called on jobs that are not enqueued. */
 void sgen_thread_pool_job_free (SgenThreadPoolJob *job);
 
-void sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job);
+void sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job);
 /* This must only be called after the job has been enqueued. */
-void sgen_thread_pool_job_wait (SgenThreadPoolJob *job);
+void sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job);
 
-void sgen_thread_pool_idle_signal (void);
-void sgen_thread_pool_idle_wait (void);
+void sgen_thread_pool_idle_signal (SgenThreadPool *pool);
+void sgen_thread_pool_idle_wait (SgenThreadPool *pool);
 
-void sgen_thread_pool_wait_for_all_jobs (void);
+void sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool);
 
-int sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread);
+int sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId thread);
 
 #endif
index d5343db50d1c37cb2f3241ebab4b9e0eb760823d..47fb00d9e1f8db39461cc62ff7a29c126ffedf61 100644 (file)
@@ -26,6 +26,9 @@ static volatile gboolean forced_stop;
 static WorkerData *workers_data;
 static SgenWorkerCallback worker_init_cb;
 
+static SgenThreadPool pool_inst;
+static SgenThreadPool *pool; /* null if we're not using workers */
+
 /*
  * When using multiple workers, we need to have the last worker
  * enqueue the preclean jobs (if there are any). This lock ensures
@@ -84,7 +87,7 @@ set_state (WorkerData *data, State old_state, State new_state)
        else if (new_state == STATE_WORKING)
                SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
        if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING)
-               SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
+               SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (pool, mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
 
        return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
 }
@@ -128,7 +131,7 @@ sgen_workers_ensure_awake (void)
        }
 
        if (need_signal)
-               sgen_thread_pool_idle_signal ();
+               sgen_thread_pool_idle_signal (pool);
 }
 
 static void
@@ -200,7 +203,7 @@ sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue)
                return;
        }
 
-       sgen_thread_pool_job_enqueue (job);
+       sgen_thread_pool_job_enqueue (pool, job);
 }
 
 static gboolean
@@ -371,12 +374,7 @@ void
 sgen_workers_init (int num_workers, SgenWorkerCallback callback)
 {
        int i;
-       void **workers_data_ptrs = (void **)alloca(num_workers * sizeof(void *));
-
-       if (!sgen_get_major_collector ()->is_concurrent && !sgen_get_minor_collector ()->is_parallel) {
-               sgen_thread_pool_init (num_workers, thread_pool_init_func, NULL, NULL, NULL, NULL);
-               return;
-       }
+       WorkerData **workers_data_ptrs = (WorkerData**)alloca(num_workers * sizeof(WorkerData*));
 
        mono_os_mutex_init (&finished_lock);
        //g_print ("initing %d workers\n", num_workers);
@@ -390,15 +388,23 @@ sgen_workers_init (int num_workers, SgenWorkerCallback callback)
        init_distribute_gray_queue ();
 
        for (i = 0; i < num_workers; ++i)
-               workers_data_ptrs [i] = (void *) &workers_data [i];
+               workers_data_ptrs [i] = &workers_data [i];
 
        worker_init_cb = callback;
 
-       sgen_thread_pool_init (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, workers_data_ptrs);
+       pool = &pool_inst;
+       sgen_thread_pool_init (pool, num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (SgenThreadPoolData**)workers_data_ptrs);
 
        mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
 }
 
+void
+sgen_workers_shutdown (void)
+{
+       if (pool)
+               sgen_thread_pool_shutdown (pool);
+}
+
 void
 sgen_workers_stop_all_workers (void)
 {
@@ -406,8 +412,8 @@ sgen_workers_stop_all_workers (void)
        mono_memory_write_barrier ();
        forced_stop = TRUE;
 
-       sgen_thread_pool_wait_for_all_jobs ();
-       sgen_thread_pool_idle_wait ();
+       sgen_thread_pool_wait_for_all_jobs (pool);
+       sgen_thread_pool_idle_wait (pool);
        SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
 }
 
@@ -446,8 +452,8 @@ sgen_workers_join (void)
 {
        int i;
 
-       sgen_thread_pool_wait_for_all_jobs ();
-       sgen_thread_pool_idle_wait ();
+       sgen_thread_pool_wait_for_all_jobs (pool);
+       sgen_thread_pool_idle_wait (pool);
        SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
 
        /* At this point all the workers have stopped. */
@@ -546,4 +552,12 @@ sgen_workers_foreach (SgenWorkerCallback callback)
                callback (&workers_data [i]);
 }
 
+gboolean
+sgen_workers_is_worker_thread (MonoNativeThreadId id)
+{
+       if (!pool)
+               return FALSE;
+       return sgen_thread_pool_is_thread_pool_thread (pool, id);
+}
+
 #endif
index e2f030dc159135210d50d80fd1e60a5799e7291a..78dea19867d645d76b7fe73dfcf81c04981da7a2 100644 (file)
 
 typedef struct _WorkerData WorkerData;
 struct _WorkerData {
+       /*
+        * Threadpool threads receive as their starting argument a WorkerData.
+        * tp_data is meant for use inside the sgen thread pool and must be first.
+        */
+       SgenThreadPoolData tp_data;
        gint32 state;
        SgenGrayQueue private_gray_queue; /* only read/written by worker thread */
        /*
@@ -30,6 +35,7 @@ typedef void (*SgenWorkersFinishCallback) (void);
 typedef void (*SgenWorkerCallback) (WorkerData *data);
 
 void sgen_workers_init (int num_workers, SgenWorkerCallback callback);
+void sgen_workers_shutdown (void);
 void sgen_workers_stop_all_workers (void);
 void sgen_workers_set_num_active_workers (int num_workers);
 void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
@@ -46,5 +52,6 @@ void sgen_workers_take_from_queue (SgenGrayQueue *queue);
 SgenObjectOperations* sgen_workers_get_idle_func_object_ops (void);
 int sgen_workers_get_job_split_count (void);
 void sgen_workers_foreach (SgenWorkerCallback callback);
+gboolean sgen_workers_is_worker_thread (MonoNativeThreadId id);
 
 #endif