/*
+ * sgen-workers.c: Worker threads for parallel and concurrent GC.
+ *
* Copyright 2001-2003 Ximian, Inc
* Copyright 2003-2010 Novell, Inc.
+ * Copyright (C) 2012 Xamarin Inc
*
- * Permission is hereby granted, free of charge, to any person obtaining
- * a copy of this software and associated documentation files (the
- * "Software"), to deal in the Software without restriction, including
- * without limitation the rights to use, copy, modify, merge, publish,
- * distribute, sublicense, and/or sell copies of the Software, and to
- * permit persons to whom the Software is furnished to do so, subject to
- * the following conditions:
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License 2.0 as published by the Free Software Foundation;
*
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
*
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * You should have received a copy of the GNU Library General Public
+ * License 2.0 along with this library; if not, write to the Free
+ * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include "config.h"
static int workers_num;
static WorkerData *workers_data;
-static WorkerData workers_gc_thread_data;
+static void *workers_gc_thread_major_collector_data = NULL;
-static SgenGrayQueue workers_distribute_gray_queue;
+static SgenSectionGrayQueue workers_distribute_gray_queue;
+static gboolean workers_distribute_gray_queue_inited;
-static volatile gboolean workers_gc_in_progress = FALSE;
static volatile gboolean workers_marking = FALSE;
static gboolean workers_started = FALSE;
-static volatile int workers_num_waiting = 0;
+
+typedef union {
+ gint32 value;
+ struct {
+ /*
+ * Decremented by the main thread and incremented by
+ * worker threads.
+ */
+ guint32 num_waiting : 8;
+ /* Set by worker threads and reset by the main thread. */
+ guint32 done_posted : 1;
+ /* Set by the main thread. */
+ guint32 gc_in_progress : 1;
+ } data;
+} State;
+
+static volatile State workers_state;
+
static MonoSemType workers_waiting_sem;
static MonoSemType workers_done_sem;
-static volatile int workers_done_posted = 0;
static volatile int workers_job_queue_num_entries = 0;
static volatile JobQueueEntry *workers_job_queue = NULL;
static LOCK_DECLARE (workers_job_queue_mutex);
+static int workers_num_jobs_enqueued = 0;
+static volatile int workers_num_jobs_finished = 0;
static long long stat_workers_stolen_from_self_lock;
static long long stat_workers_stolen_from_self_no_lock;
static long long stat_workers_stolen_from_others;
static long long stat_workers_num_waited;
+static gboolean
+set_state (State old_state, State new_state)
+{
+ return InterlockedCompareExchange (&workers_state.value,
+ new_state.value, old_state.value) == old_state.value;
+}
+
static void
workers_wake_up (int max)
{
int i;
for (i = 0; i < max; ++i) {
- int num;
+ State old_state, new_state;
do {
- num = workers_num_waiting;
- if (num == 0)
+ old_state = new_state = workers_state;
+ /*
+ * We must not wake workers up once done has
+ * been posted.
+ */
+ if (old_state.data.done_posted)
+ return;
+ if (old_state.data.num_waiting == 0)
return;
- } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num);
+ --new_state.data.num_waiting;
+ } while (!set_state (old_state, new_state));
MONO_SEM_POST (&workers_waiting_sem);
}
}
workers_wake_up (workers_num);
}
+void
+sgen_workers_wake_up_all (void)
+{
+ g_assert (workers_state.data.gc_in_progress);
+ workers_wake_up_all ();
+}
+
static void
workers_wait (void)
{
- int num;
+ State old_state, new_state;
++stat_workers_num_waited;
do {
- num = workers_num_waiting;
- } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num);
- if (num + 1 == workers_num && !workers_gc_in_progress) {
- /* Make sure the done semaphore is only posted once. */
- int posted;
- do {
- posted = workers_done_posted;
- if (posted)
- break;
- } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0);
- if (!posted)
- MONO_SEM_POST (&workers_done_sem);
- }
+ old_state = new_state = workers_state;
+ /*
+ * Only the last worker thread awake can set the done
+ * posted flag, and since we're awake and haven't set
+ * it yet, it cannot be set.
+ */
+ g_assert (!old_state.data.done_posted);
+ ++new_state.data.num_waiting;
+ /*
+ * This is the only place where we use
+ * workers_gc_in_progress in the worker threads.
+ */
+ if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
+ new_state.data.done_posted = 1;
+ } while (!set_state (old_state, new_state));
+ mono_memory_barrier ();
+ if (new_state.data.done_posted)
+ MONO_SEM_POST (&workers_done_sem);
MONO_SEM_WAIT (&workers_waiting_sem);
}
+static gboolean
+collection_needs_workers (void)
+{
+ return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
+}
+
void
sgen_workers_enqueue_job (JobFunc func, void *data)
{
int num_entries;
JobQueueEntry *entry;
- if (!sgen_collection_is_parallel ()) {
+ if (!collection_needs_workers ()) {
func (NULL, data);
return;
}
+ g_assert (workers_state.data.gc_in_progress);
+
entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
entry->func = func;
entry->data = data;
entry->next = workers_job_queue;
workers_job_queue = entry;
num_entries = ++workers_job_queue_num_entries;
+ ++workers_num_jobs_enqueued;
mono_mutex_unlock (&workers_job_queue_mutex);
workers_wake_up (num_entries);
}
+void
+sgen_workers_wait_for_jobs (void)
+{
+ // FIXME: implement this properly
+ while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
+ State state = workers_state;
+ g_assert (state.data.gc_in_progress);
+ g_assert (!state.data.done_posted);
+ if (state.data.num_waiting == workers_num)
+ workers_wake_up_all ();
+ g_usleep (1000);
+ }
+}
+
static gboolean
workers_dequeue_and_do_job (WorkerData *data)
{
if (!entry)
return FALSE;
- g_assert (sgen_collection_is_parallel ());
+ g_assert (collection_needs_workers ());
entry->func (data, entry->data);
sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
+
+ SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
+
return TRUE;
}
static gboolean
workers_get_work (WorkerData *data)
{
+ SgenMajorCollector *major;
int i;
g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
if (workers_steal (data, data, TRUE))
return TRUE;
- /* Then from the GC thread's stack. */
- if (workers_steal (data, &workers_gc_thread_data, TRUE))
- return TRUE;
-
- /* Finally, from another worker. */
+ /* From another worker. */
for (i = 0; i < workers_num; ++i) {
WorkerData *victim_data = &workers_data [i];
if (data == victim_data)
return TRUE;
}
+ /*
+ * If we're concurrent or parallel, from the workers
+ * distribute gray queue.
+ */
+ major = sgen_get_major_collector ();
+ if (major->is_concurrent || major->is_parallel) {
+ GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
+ if (section) {
+ sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
+ return TRUE;
+ }
+ }
+
/* Nobody to steal from */
g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
return FALSE;
* There are still objects in the stealable stack, so
* wake up any workers that might be sleeping
*/
- if (workers_gc_in_progress)
+ if (workers_state.data.gc_in_progress)
workers_wake_up_all ();
return;
}
sgen_gray_object_free_queue_section (section);
}
- if (data != &workers_gc_thread_data && sgen_gray_object_queue_is_empty (queue))
+ if (sgen_gray_object_queue_is_empty (queue))
workers_steal (data, data, FALSE);
mono_mutex_unlock (&data->stealable_stack_mutex);
- if (workers_gc_in_progress)
+ if (workers_state.data.gc_in_progress)
workers_wake_up_all ();
}
+static void
+concurrent_enqueue_check (char *obj)
+{
+ g_assert (sgen_concurrent_collection_in_progress ());
+ g_assert (!sgen_ptr_in_nursery (obj));
+ g_assert (SGEN_LOAD_VTABLE (obj));
+}
+
+static void
+init_private_gray_queue (WorkerData *data)
+{
+ sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
+ sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
+ workers_gray_queue_share_redirect, data);
+}
+
static mono_native_thread_return_t
workers_thread_func (void *data_untyped)
{
WorkerData *data = data_untyped;
+ SgenMajorCollector *major = sgen_get_major_collector ();
mono_thread_info_register_small_id ();
- if (sgen_get_major_collector ()->init_worker_thread)
- sgen_get_major_collector ()->init_worker_thread (data->major_collector_data);
+ if (major->init_worker_thread)
+ major->init_worker_thread (data->major_collector_data);
- sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
- workers_gray_queue_share_redirect, data);
+ init_private_gray_queue (data);
for (;;) {
gboolean did_work = FALSE;
}
if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
+ SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
+ ? &major->major_concurrent_ops
+ : &major->major_ops;
+ ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
+
g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
- while (!sgen_drain_gray_stack (&data->private_gray_queue, 32))
+ while (!sgen_drain_gray_stack (32, ctx))
workers_gray_queue_share_redirect (&data->private_gray_queue);
g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
- sgen_gray_object_queue_init (&data->private_gray_queue);
+ init_private_gray_queue (data);
did_work = TRUE;
}
return NULL;
}
-void
-sgen_workers_distribute_gray_queue_sections (void)
+static void
+init_distribute_gray_queue (gboolean locked)
{
- if (!sgen_collection_is_parallel ())
+ if (workers_distribute_gray_queue_inited) {
+ g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
+ g_assert (!workers_distribute_gray_queue.locked == !locked);
return;
+ }
- workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
+ sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
+ sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
+ workers_distribute_gray_queue_inited = TRUE;
}
void
sgen_workers_init_distribute_gray_queue (void)
{
- if (!sgen_collection_is_parallel ())
+ if (!collection_needs_workers ())
return;
- sgen_gray_object_queue_init (&workers_distribute_gray_queue);
+ init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
}
void
{
int i;
- if (!sgen_get_major_collector ()->is_parallel)
+ if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
return;
//g_print ("initing %d workers\n", num_workers);
workers_num = num_workers;
- workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
+ workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
memset (workers_data, 0, sizeof (WorkerData) * num_workers);
MONO_SEM_INIT (&workers_waiting_sem, 0);
MONO_SEM_INIT (&workers_done_sem, 0);
- sgen_gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue,
- workers_gray_queue_share_redirect, &workers_gc_thread_data);
- mono_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL);
- workers_gc_thread_data.stealable_stack_fill = 0;
+ init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
if (sgen_get_major_collector ()->alloc_worker_data)
- workers_gc_thread_data.major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
+ workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
for (i = 0; i < workers_num; ++i) {
/* private gray queue is inited by the thread itself */
- mono_mutex_init (&workers_data [i].stealable_stack_mutex, NULL);
+ mono_mutex_init (&workers_data [i].stealable_stack_mutex);
workers_data [i].stealable_stack_fill = 0;
if (sgen_get_major_collector ()->alloc_worker_data)
void
sgen_workers_start_all_workers (void)
{
+ State old_state, new_state;
int i;
- if (!sgen_collection_is_parallel ())
+ if (!collection_needs_workers ())
return;
if (sgen_get_major_collector ()->init_worker_thread)
- sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_data.major_collector_data);
+ sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
+
+ old_state = new_state = workers_state;
+ g_assert (!old_state.data.gc_in_progress);
+ new_state.data.gc_in_progress = TRUE;
- g_assert (!workers_gc_in_progress);
- workers_gc_in_progress = TRUE;
workers_marking = FALSE;
- workers_done_posted = 0;
+
+ g_assert (workers_job_queue_num_entries == 0);
+ workers_num_jobs_enqueued = 0;
+ workers_num_jobs_finished = 0;
if (workers_started) {
- if (workers_num_waiting != workers_num)
- g_error ("Expecting all %d sgen workers to be parked, but only %d are", workers_num, workers_num_waiting);
+ g_assert (old_state.data.done_posted);
+ if (old_state.data.num_waiting != workers_num) {
+ g_error ("Expecting all %d sgen workers to be parked, but only %d are",
+ workers_num, old_state.data.num_waiting);
+ }
+
+ /* Clear the done posted flag */
+ new_state.data.done_posted = 0;
+ if (!set_state (old_state, new_state))
+ g_assert_not_reached ();
+
workers_wake_up_all ();
return;
}
+ g_assert (!old_state.data.done_posted);
+
+ if (!set_state (old_state, new_state))
+ g_assert_not_reached ();
+
for (i = 0; i < workers_num; ++i)
workers_start_worker (i);
workers_started = TRUE;
}
+gboolean
+sgen_workers_have_started (void)
+{
+ return workers_state.data.gc_in_progress;
+}
+
void
sgen_workers_start_marking (void)
{
- if (!sgen_collection_is_parallel ())
+ if (!collection_needs_workers ())
return;
- g_assert (workers_started && workers_gc_in_progress);
+ g_assert (workers_started && workers_state.data.gc_in_progress);
g_assert (!workers_marking);
workers_marking = TRUE;
void
sgen_workers_join (void)
{
+ State old_state, new_state;
int i;
- if (!sgen_collection_is_parallel ())
+ if (!collection_needs_workers ())
return;
- g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
- g_assert (sgen_gray_object_queue_is_empty (&workers_distribute_gray_queue));
+ do {
+ old_state = new_state = workers_state;
+ g_assert (old_state.data.gc_in_progress);
+ g_assert (!old_state.data.done_posted);
+
+ new_state.data.gc_in_progress = 0;
+ } while (!set_state (old_state, new_state));
- g_assert (workers_gc_in_progress);
- workers_gc_in_progress = FALSE;
- if (workers_num_waiting == workers_num) {
+ if (new_state.data.num_waiting == workers_num) {
/*
- * All the workers might have shut down at this point
- * and posted the done semaphore but we don't know it
- * yet. It's not a big deal to wake them up again -
- * they'll just do one iteration of their loop trying to
- * find something to do and then go back to waiting
- * again.
+ * All the workers have shut down but haven't posted
+ * the done semaphore yet, or, if we come from below,
+ * haven't done all their work yet.
+ *
+ * It's not a big deal to wake them up again - they'll
+ * just do one iteration of their loop trying to find
+ * something to do and then go back to waiting again.
*/
+ reawaken:
workers_wake_up_all ();
}
MONO_SEM_WAIT (&workers_done_sem);
+
+ old_state = new_state = workers_state;
+ g_assert (old_state.data.num_waiting == workers_num);
+ g_assert (old_state.data.done_posted);
+
+ if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
+ /*
+ * There's a small race condition that we avoid here.
+ * It's possible that a worker thread runs out of
+ * things to do, so it goes to sleep. Right at that
+ * moment a new job is enqueued, but the thread is
+ * still registered as running. Now the threads are
+ * joined, and we wait for the semaphore. Only at
+ * this point does the worker go to sleep, and posts
+ * the semaphore, because workers_gc_in_progress is
+ * already FALSE. The job is still in the queue,
+ * though.
+ *
+ * Clear the done posted flag.
+ */
+ new_state.data.done_posted = 0;
+ if (!set_state (old_state, new_state))
+ g_assert_not_reached ();
+ goto reawaken;
+ }
+
+ /* At this point all the workers have stopped. */
+
workers_marking = FALSE;
if (sgen_get_major_collector ()->reset_worker_data) {
sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
}
- g_assert (workers_done_posted);
-
- g_assert (!workers_gc_thread_data.stealable_stack_fill);
- g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
+ g_assert (workers_job_queue_num_entries == 0);
+ g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
for (i = 0; i < workers_num; ++i) {
g_assert (!workers_data [i].stealable_stack_fill);
g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
}
}
+gboolean
+sgen_workers_all_done (void)
+{
+ State state = workers_state;
+ /*
+ * Can only be called while the collection is still in
+ * progress, i.e., before done has been posted.
+ */
+ g_assert (state.data.gc_in_progress);
+ g_assert (!state.data.done_posted);
+ return state.data.num_waiting == workers_num;
+}
+
gboolean
sgen_is_worker_thread (MonoNativeThreadId thread)
{
return FALSE;
}
-gboolean
-sgen_workers_is_distributed_queue (SgenGrayQueue *queue)
-{
- return queue == &workers_distribute_gray_queue;
-}
-
-SgenGrayQueue*
-sgen_workers_get_distribute_gray_queue (void)
+SgenSectionGrayQueue*
+sgen_workers_get_distribute_section_gray_queue (void)
{
return &workers_distribute_gray_queue;
}
sgen_workers_reset_data (void)
{
if (sgen_get_major_collector ()->reset_worker_data)
- sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_data.major_collector_data);
-
+ sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);
}
#endif