Fixed warning in MSVC: 'different types for formal and actual parameter'.
[mono.git] / mono / metadata / sgen-workers.c
index 819133545e05c07fd5a810e869daed83e5f36b35..77c7fadcac086f76a2ea8baad3f2328f31733ec0 100644 (file)
@@ -1,25 +1,22 @@
 /*
+ * sgen-workers.c: Worker threads for parallel and concurrent GC.
+ *
  * Copyright 2001-2003 Ximian, Inc
  * Copyright 2003-2010 Novell, Inc.
+ * Copyright (C) 2012 Xamarin Inc
  *
- * Permission is hereby granted, free of charge, to any person obtaining
- * a copy of this software and associated documentation files (the
- * "Software"), to deal in the Software without restriction, including
- * without limitation the rights to use, copy, modify, merge, publish,
- * distribute, sublicense, and/or sell copies of the Software, and to
- * permit persons to whom the Software is furnished to do so, subject to
- * the following conditions:
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License 2.0 as published by the Free Software Foundation;
  *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
  *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * You should have received a copy of the GNU Library General Public
+ * License 2.0 along with this library; if not, write to the Free
+ * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #include "config.h"
 
 static int workers_num;
 static WorkerData *workers_data;
-static WorkerData workers_gc_thread_data;
+static void *workers_gc_thread_major_collector_data = NULL;
 
-static SgenGrayQueue workers_distribute_gray_queue;
+static SgenSectionGrayQueue workers_distribute_gray_queue;
+static gboolean workers_distribute_gray_queue_inited;
 
-static volatile gboolean workers_gc_in_progress = FALSE;
 static volatile gboolean workers_marking = FALSE;
 static gboolean workers_started = FALSE;
-static volatile int workers_num_waiting = 0;
+
+typedef union {
+       gint32 value;
+       struct {
+               /*
+                * Decremented by the main thread and incremented by
+                * worker threads.
+                */
+               guint32 num_waiting : 8;
+               /* Set by worker threads and reset by the main thread. */
+               guint32 done_posted : 1;
+               /* Set by the main thread. */
+               guint32 gc_in_progress : 1;
+       } data;
+} State;
+
+static volatile State workers_state;
+
 static MonoSemType workers_waiting_sem;
 static MonoSemType workers_done_sem;
-static volatile int workers_done_posted = 0;
 
 static volatile int workers_job_queue_num_entries = 0;
 static volatile JobQueueEntry *workers_job_queue = NULL;
 static LOCK_DECLARE (workers_job_queue_mutex);
+static int workers_num_jobs_enqueued = 0;
+static volatile int workers_num_jobs_finished = 0;
 
 static long long stat_workers_stolen_from_self_lock;
 static long long stat_workers_stolen_from_self_no_lock;
 static long long stat_workers_stolen_from_others;
 static long long stat_workers_num_waited;
 
+static gboolean
+set_state (State old_state, State new_state)
+{
+       return InterlockedCompareExchange (&workers_state.value,
+                       new_state.value, old_state.value) == old_state.value;
+}
+
 static void
 workers_wake_up (int max)
 {
        int i;
 
        for (i = 0; i < max; ++i) {
-               int num;
+               State old_state, new_state;
                do {
-                       num = workers_num_waiting;
-                       if (num == 0)
+                       old_state = new_state = workers_state;
+                       /*
+                        * We must not wake workers up once done has
+                        * been posted.
+                        */
+                       if (old_state.data.done_posted)
+                               return;
+                       if (old_state.data.num_waiting == 0)
                                return;
-               } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num);
+                       --new_state.data.num_waiting;
+               } while (!set_state (old_state, new_state));
                MONO_SEM_POST (&workers_waiting_sem);
        }
 }
@@ -74,39 +103,59 @@ workers_wake_up_all (void)
        workers_wake_up (workers_num);
 }
 
+void
+sgen_workers_wake_up_all (void)
+{
+       g_assert (workers_state.data.gc_in_progress);
+       workers_wake_up_all ();
+}
+
 static void
 workers_wait (void)
 {
-       int num;
+       State old_state, new_state;
        ++stat_workers_num_waited;
        do {
-               num = workers_num_waiting;
-       } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num);
-       if (num + 1 == workers_num && !workers_gc_in_progress) {
-               /* Make sure the done semaphore is only posted once. */
-               int posted;
-               do {
-                       posted = workers_done_posted;
-                       if (posted)
-                               break;
-               } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0);
-               if (!posted)
-                       MONO_SEM_POST (&workers_done_sem);
-       }
+               old_state = new_state = workers_state;
+               /*
+                * Only the last worker thread awake can set the done
+                * posted flag, and since we're awake and haven't set
+                * it yet, it cannot be set.
+                */
+               g_assert (!old_state.data.done_posted);
+               ++new_state.data.num_waiting;
+               /*
+                * This is the only place where we use
+                * workers_gc_in_progress in the worker threads.
+                */
+               if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
+                       new_state.data.done_posted = 1;
+       } while (!set_state (old_state, new_state));
+       mono_memory_barrier ();
+       if (new_state.data.done_posted)
+               MONO_SEM_POST (&workers_done_sem);
        MONO_SEM_WAIT (&workers_waiting_sem);
 }
 
+static gboolean
+collection_needs_workers (void)
+{
+       return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
+}
+
 void
 sgen_workers_enqueue_job (JobFunc func, void *data)
 {
        int num_entries;
        JobQueueEntry *entry;
 
-       if (!sgen_collection_is_parallel ()) {
+       if (!collection_needs_workers ()) {
                func (NULL, data);
                return;
        }
 
+       g_assert (workers_state.data.gc_in_progress);
+
        entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
        entry->func = func;
        entry->data = data;
@@ -115,11 +164,26 @@ sgen_workers_enqueue_job (JobFunc func, void *data)
        entry->next = workers_job_queue;
        workers_job_queue = entry;
        num_entries = ++workers_job_queue_num_entries;
+       ++workers_num_jobs_enqueued;
        mono_mutex_unlock (&workers_job_queue_mutex);
 
        workers_wake_up (num_entries);
 }
 
+void
+sgen_workers_wait_for_jobs (void)
+{
+       // FIXME: implement this properly
+       while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
+               State state = workers_state;
+               g_assert (state.data.gc_in_progress);
+               g_assert (!state.data.done_posted);
+               if (state.data.num_waiting == workers_num)
+                       workers_wake_up_all ();
+               g_usleep (1000);
+       }
+}
+
 static gboolean
 workers_dequeue_and_do_job (WorkerData *data)
 {
@@ -147,10 +211,13 @@ workers_dequeue_and_do_job (WorkerData *data)
        if (!entry)
                return FALSE;
 
-       g_assert (sgen_collection_is_parallel ());
+       g_assert (collection_needs_workers ());
 
        entry->func (data, entry->data);
        sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
+
+       SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
+
        return TRUE;
 }
 
@@ -202,6 +269,7 @@ workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
 static gboolean
 workers_get_work (WorkerData *data)
 {
+       SgenMajorCollector *major;
        int i;
 
        g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
@@ -210,11 +278,7 @@ workers_get_work (WorkerData *data)
        if (workers_steal (data, data, TRUE))
                return TRUE;
 
-       /* Then from the GC thread's stack. */
-       if (workers_steal (data, &workers_gc_thread_data, TRUE))
-               return TRUE;
-
-       /* Finally, from another worker. */
+       /* From another worker. */
        for (i = 0; i < workers_num; ++i) {
                WorkerData *victim_data = &workers_data [i];
                if (data == victim_data)
@@ -223,6 +287,19 @@ workers_get_work (WorkerData *data)
                        return TRUE;
        }
 
+       /*
+        * If we're concurrent or parallel, from the workers
+        * distribute gray queue.
+        */
+       major = sgen_get_major_collector ();
+       if (major->is_concurrent || major->is_parallel) {
+               GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
+               if (section) {
+                       sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
+                       return TRUE;
+               }
+       }
+
        /* Nobody to steal from */
        g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
        return FALSE;
@@ -239,7 +316,7 @@ workers_gray_queue_share_redirect (SgenGrayQueue *queue)
                 * There are still objects in the stealable stack, so
                 * wake up any workers that might be sleeping
                 */
-               if (workers_gc_in_progress)
+               if (workers_state.data.gc_in_progress)
                        workers_wake_up_all ();
                return;
        }
@@ -264,27 +341,43 @@ workers_gray_queue_share_redirect (SgenGrayQueue *queue)
                        sgen_gray_object_free_queue_section (section);
        }
 
-       if (data != &workers_gc_thread_data && sgen_gray_object_queue_is_empty (queue))
+       if (sgen_gray_object_queue_is_empty (queue))
                workers_steal (data, data, FALSE);
 
        mono_mutex_unlock (&data->stealable_stack_mutex);
 
-       if (workers_gc_in_progress)
+       if (workers_state.data.gc_in_progress)
                workers_wake_up_all ();
 }
 
+static void
+concurrent_enqueue_check (char *obj)
+{
+       g_assert (sgen_concurrent_collection_in_progress ());
+       g_assert (!sgen_ptr_in_nursery (obj));
+       g_assert (SGEN_LOAD_VTABLE (obj));
+}
+
+static void
+init_private_gray_queue (WorkerData *data)
+{
+       sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
+                       sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
+                       workers_gray_queue_share_redirect, data);
+}
+
 static mono_native_thread_return_t
 workers_thread_func (void *data_untyped)
 {
        WorkerData *data = data_untyped;
+       SgenMajorCollector *major = sgen_get_major_collector ();
 
        mono_thread_info_register_small_id ();
 
-       if (sgen_get_major_collector ()->init_worker_thread)
-               sgen_get_major_collector ()->init_worker_thread (data->major_collector_data);
+       if (major->init_worker_thread)
+               major->init_worker_thread (data->major_collector_data);
 
-       sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
-                       workers_gray_queue_share_redirect, data);
+       init_private_gray_queue (data);
 
        for (;;) {
                gboolean did_work = FALSE;
@@ -295,13 +388,18 @@ workers_thread_func (void *data_untyped)
                }
 
                if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
+                       SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
+                               ? &major->major_concurrent_ops
+                               : &major->major_ops;
+                       ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
+
                        g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
 
-                       while (!sgen_drain_gray_stack (&data->private_gray_queue, 32))
+                       while (!sgen_drain_gray_stack (32, ctx))
                                workers_gray_queue_share_redirect (&data->private_gray_queue);
                        g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
 
-                       sgen_gray_object_queue_init (&data->private_gray_queue);
+                       init_private_gray_queue (data);
 
                        did_work = TRUE;
                }
@@ -314,22 +412,27 @@ workers_thread_func (void *data_untyped)
        return NULL;
 }
 
-void
-sgen_workers_distribute_gray_queue_sections (void)
+static void
+init_distribute_gray_queue (gboolean locked)
 {
-       if (!sgen_collection_is_parallel ())
+       if (workers_distribute_gray_queue_inited) {
+               g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
+               g_assert (!workers_distribute_gray_queue.locked == !locked);
                return;
+       }
 
-       workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
+       sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
+                       sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
+       workers_distribute_gray_queue_inited = TRUE;
 }
 
 void
 sgen_workers_init_distribute_gray_queue (void)
 {
-       if (!sgen_collection_is_parallel ())
+       if (!collection_needs_workers ())
                return;
 
-       sgen_gray_object_queue_init (&workers_distribute_gray_queue);
+       init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
 }
 
 void
@@ -337,30 +440,27 @@ sgen_workers_init (int num_workers)
 {
        int i;
 
-       if (!sgen_get_major_collector ()->is_parallel)
+       if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
                return;
 
        //g_print ("initing %d workers\n", num_workers);
 
        workers_num = num_workers;
 
-       workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
+       workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
        memset (workers_data, 0, sizeof (WorkerData) * num_workers);
 
        MONO_SEM_INIT (&workers_waiting_sem, 0);
        MONO_SEM_INIT (&workers_done_sem, 0);
 
-       sgen_gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue,
-                       workers_gray_queue_share_redirect, &workers_gc_thread_data);
-       mono_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL);
-       workers_gc_thread_data.stealable_stack_fill = 0;
+       init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
 
        if (sgen_get_major_collector ()->alloc_worker_data)
-               workers_gc_thread_data.major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
+               workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
 
        for (i = 0; i < workers_num; ++i) {
                /* private gray queue is inited by the thread itself */
-               mono_mutex_init (&workers_data [i].stealable_stack_mutex, NULL);
+               mono_mutex_init (&workers_data [i].stealable_stack_mutex);
                workers_data [i].stealable_stack_fill = 0;
 
                if (sgen_get_major_collector ()->alloc_worker_data)
@@ -391,39 +491,65 @@ workers_start_worker (int index)
 void
 sgen_workers_start_all_workers (void)
 {
+       State old_state, new_state;
        int i;
 
-       if (!sgen_collection_is_parallel ())
+       if (!collection_needs_workers ())
                return;
 
        if (sgen_get_major_collector ()->init_worker_thread)
-               sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_data.major_collector_data);
+               sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
+
+       old_state = new_state = workers_state;
+       g_assert (!old_state.data.gc_in_progress);
+       new_state.data.gc_in_progress = TRUE;
 
-       g_assert (!workers_gc_in_progress);
-       workers_gc_in_progress = TRUE;
        workers_marking = FALSE;
-       workers_done_posted = 0;
+
+       g_assert (workers_job_queue_num_entries == 0);
+       workers_num_jobs_enqueued = 0;
+       workers_num_jobs_finished = 0;
 
        if (workers_started) {
-               if (workers_num_waiting != workers_num)
-                       g_error ("Expecting all %d sgen workers to be parked, but only %d are", workers_num, workers_num_waiting);
+               g_assert (old_state.data.done_posted);
+               if (old_state.data.num_waiting != workers_num) {
+                       g_error ("Expecting all %d sgen workers to be parked, but only %d are",
+                                       workers_num, old_state.data.num_waiting);
+               }
+
+               /* Clear the done posted flag */
+               new_state.data.done_posted = 0;
+               if (!set_state (old_state, new_state))
+                       g_assert_not_reached ();
+
                workers_wake_up_all ();
                return;
        }
 
+       g_assert (!old_state.data.done_posted);
+
+       if (!set_state (old_state, new_state))
+               g_assert_not_reached ();
+
        for (i = 0; i < workers_num; ++i)
                workers_start_worker (i);
 
        workers_started = TRUE;
 }
 
+gboolean
+sgen_workers_have_started (void)
+{
+       return workers_state.data.gc_in_progress;
+}
+
 void
 sgen_workers_start_marking (void)
 {
-       if (!sgen_collection_is_parallel ())
+       if (!collection_needs_workers ())
                return;
 
-       g_assert (workers_started && workers_gc_in_progress);
+       g_assert (workers_started && workers_state.data.gc_in_progress);
        g_assert (!workers_marking);
 
        workers_marking = TRUE;
@@ -434,28 +560,62 @@ sgen_workers_start_marking (void)
 void
 sgen_workers_join (void)
 {
+       State old_state, new_state;
        int i;
 
-       if (!sgen_collection_is_parallel ())
+       if (!collection_needs_workers ())
                return;
 
-       g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
-       g_assert (sgen_gray_object_queue_is_empty (&workers_distribute_gray_queue));
+       do {
+               old_state = new_state = workers_state;
+               g_assert (old_state.data.gc_in_progress);
+               g_assert (!old_state.data.done_posted);
+
+               new_state.data.gc_in_progress = 0;
+       } while (!set_state (old_state, new_state));
 
-       g_assert (workers_gc_in_progress);
-       workers_gc_in_progress = FALSE;
-       if (workers_num_waiting == workers_num) {
+       if (new_state.data.num_waiting == workers_num) {
                /*
-                * All the workers might have shut down at this point
-                * and posted the done semaphore but we don't know it
-                * yet.  It's not a big deal to wake them up again -
-                * they'll just do one iteration of their loop trying to
-                * find something to do and then go back to waiting
-                * again.
+                * All the workers have shut down but haven't posted
+                * the done semaphore yet, or, if we come from below,
+                * haven't done all their work yet.
+                *
+                * It's not a big deal to wake them up again - they'll
+                * just do one iteration of their loop trying to find
+                * something to do and then go back to waiting again.
                 */
+       reawaken:
                workers_wake_up_all ();
        }
        MONO_SEM_WAIT (&workers_done_sem);
+
+       old_state = new_state = workers_state;
+       g_assert (old_state.data.num_waiting == workers_num);
+       g_assert (old_state.data.done_posted);
+
+       if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
+               /*
+                * There's a small race condition that we avoid here.
+                * It's possible that a worker thread runs out of
+                * things to do, so it goes to sleep.  Right at that
+                * moment a new job is enqueued, but the thread is
+                * still registered as running.  Now the threads are
+                * joined, and we wait for the semaphore.  Only at
+                * this point does the worker go to sleep, and posts
+                * the semaphore, because workers_gc_in_progress is
+                * already FALSE.  The job is still in the queue,
+                * though.
+                *
+                * Clear the done posted flag.
+                */
+               new_state.data.done_posted = 0;
+               if (!set_state (old_state, new_state))
+                       g_assert_not_reached ();
+               goto reawaken;
+       }
+
+       /* At this point all the workers have stopped. */
+
        workers_marking = FALSE;
 
        if (sgen_get_major_collector ()->reset_worker_data) {
@@ -463,16 +623,27 @@ sgen_workers_join (void)
                        sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
        }
 
-       g_assert (workers_done_posted);
-
-       g_assert (!workers_gc_thread_data.stealable_stack_fill);
-       g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
+       g_assert (workers_job_queue_num_entries == 0);
+       g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
        for (i = 0; i < workers_num; ++i) {
                g_assert (!workers_data [i].stealable_stack_fill);
                g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
        }
 }
 
+gboolean
+sgen_workers_all_done (void)
+{
+       State state = workers_state;
+       /*
+        * Can only be called while the collection is still in
+        * progress, i.e., before done has been posted.
+        */
+       g_assert (state.data.gc_in_progress);
+       g_assert (!state.data.done_posted);
+       return state.data.num_waiting == workers_num;
+}
+
 gboolean
 sgen_is_worker_thread (MonoNativeThreadId thread)
 {
@@ -488,14 +659,8 @@ sgen_is_worker_thread (MonoNativeThreadId thread)
        return FALSE;
 }
 
-gboolean
-sgen_workers_is_distributed_queue (SgenGrayQueue *queue)
-{
-       return queue == &workers_distribute_gray_queue;
-}
-
-SgenGrayQueue*
-sgen_workers_get_distribute_gray_queue (void)
+SgenSectionGrayQueue*
+sgen_workers_get_distribute_section_gray_queue (void)
 {
        return &workers_distribute_gray_queue;
 }
@@ -504,7 +669,6 @@ void
 sgen_workers_reset_data (void)
 {
        if (sgen_get_major_collector ()->reset_worker_data)
-               sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_data.major_collector_data);
-
+               sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);
 }
 #endif