X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fsgen-workers.c;h=77c7fadcac086f76a2ea8baad3f2328f31733ec0;hb=623e43dcd6f28da4e486ea50e4120925039bae34;hp=819133545e05c07fd5a810e869daed83e5f36b35;hpb=65f7004927482ca6696aaea4cd219130aa8a68db;p=mono.git diff --git a/mono/metadata/sgen-workers.c b/mono/metadata/sgen-workers.c index 819133545e0..77c7fadcac0 100644 --- a/mono/metadata/sgen-workers.c +++ b/mono/metadata/sgen-workers.c @@ -1,25 +1,22 @@ /* + * sgen-workers.c: Worker threads for parallel and concurrent GC. + * * Copyright 2001-2003 Ximian, Inc * Copyright 2003-2010 Novell, Inc. + * Copyright (C) 2012 Xamarin Inc * - * Permission is hereby granted, free of charge, to any person obtaining - * a copy of this software and associated documentation files (the - * "Software"), to deal in the Software without restriction, including - * without limitation the rights to use, copy, modify, merge, publish, - * distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to - * the following conditions: + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License 2.0 as published by the Free Software Foundation; * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE - * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION - * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * You should have received a copy of the GNU Library General Public + * License 2.0 along with this library; if not, write to the Free + * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include "config.h" @@ -31,39 +28,71 @@ static int workers_num; static WorkerData *workers_data; -static WorkerData workers_gc_thread_data; +static void *workers_gc_thread_major_collector_data = NULL; -static SgenGrayQueue workers_distribute_gray_queue; +static SgenSectionGrayQueue workers_distribute_gray_queue; +static gboolean workers_distribute_gray_queue_inited; -static volatile gboolean workers_gc_in_progress = FALSE; static volatile gboolean workers_marking = FALSE; static gboolean workers_started = FALSE; -static volatile int workers_num_waiting = 0; + +typedef union { + gint32 value; + struct { + /* + * Decremented by the main thread and incremented by + * worker threads. + */ + guint32 num_waiting : 8; + /* Set by worker threads and reset by the main thread. */ + guint32 done_posted : 1; + /* Set by the main thread. */ + guint32 gc_in_progress : 1; + } data; +} State; + +static volatile State workers_state; + static MonoSemType workers_waiting_sem; static MonoSemType workers_done_sem; -static volatile int workers_done_posted = 0; static volatile int workers_job_queue_num_entries = 0; static volatile JobQueueEntry *workers_job_queue = NULL; static LOCK_DECLARE (workers_job_queue_mutex); +static int workers_num_jobs_enqueued = 0; +static volatile int workers_num_jobs_finished = 0; static long long stat_workers_stolen_from_self_lock; static long long stat_workers_stolen_from_self_no_lock; static long long stat_workers_stolen_from_others; static long long stat_workers_num_waited; +static gboolean +set_state (State old_state, State new_state) +{ + return InterlockedCompareExchange (&workers_state.value, + new_state.value, old_state.value) == old_state.value; +} + static void workers_wake_up (int max) { int i; for (i = 0; i < max; ++i) { - int num; + State old_state, new_state; do { - num = workers_num_waiting; - if (num == 0) + old_state = new_state = workers_state; + /* + * We must not wake workers up once done has + * been posted. + */ + if (old_state.data.done_posted) + return; + if (old_state.data.num_waiting == 0) return; - } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num); + --new_state.data.num_waiting; + } while (!set_state (old_state, new_state)); MONO_SEM_POST (&workers_waiting_sem); } } @@ -74,39 +103,59 @@ workers_wake_up_all (void) workers_wake_up (workers_num); } +void +sgen_workers_wake_up_all (void) +{ + g_assert (workers_state.data.gc_in_progress); + workers_wake_up_all (); +} + static void workers_wait (void) { - int num; + State old_state, new_state; ++stat_workers_num_waited; do { - num = workers_num_waiting; - } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num); - if (num + 1 == workers_num && !workers_gc_in_progress) { - /* Make sure the done semaphore is only posted once. */ - int posted; - do { - posted = workers_done_posted; - if (posted) - break; - } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0); - if (!posted) - MONO_SEM_POST (&workers_done_sem); - } + old_state = new_state = workers_state; + /* + * Only the last worker thread awake can set the done + * posted flag, and since we're awake and haven't set + * it yet, it cannot be set. + */ + g_assert (!old_state.data.done_posted); + ++new_state.data.num_waiting; + /* + * This is the only place where we use + * workers_gc_in_progress in the worker threads. + */ + if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress) + new_state.data.done_posted = 1; + } while (!set_state (old_state, new_state)); + mono_memory_barrier (); + if (new_state.data.done_posted) + MONO_SEM_POST (&workers_done_sem); MONO_SEM_WAIT (&workers_waiting_sem); } +static gboolean +collection_needs_workers (void) +{ + return sgen_collection_is_parallel () || sgen_collection_is_concurrent (); +} + void sgen_workers_enqueue_job (JobFunc func, void *data) { int num_entries; JobQueueEntry *entry; - if (!sgen_collection_is_parallel ()) { + if (!collection_needs_workers ()) { func (NULL, data); return; } + g_assert (workers_state.data.gc_in_progress); + entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY); entry->func = func; entry->data = data; @@ -115,11 +164,26 @@ sgen_workers_enqueue_job (JobFunc func, void *data) entry->next = workers_job_queue; workers_job_queue = entry; num_entries = ++workers_job_queue_num_entries; + ++workers_num_jobs_enqueued; mono_mutex_unlock (&workers_job_queue_mutex); workers_wake_up (num_entries); } +void +sgen_workers_wait_for_jobs (void) +{ + // FIXME: implement this properly + while (workers_num_jobs_finished < workers_num_jobs_enqueued) { + State state = workers_state; + g_assert (state.data.gc_in_progress); + g_assert (!state.data.done_posted); + if (state.data.num_waiting == workers_num) + workers_wake_up_all (); + g_usleep (1000); + } +} + static gboolean workers_dequeue_and_do_job (WorkerData *data) { @@ -147,10 +211,13 @@ workers_dequeue_and_do_job (WorkerData *data) if (!entry) return FALSE; - g_assert (sgen_collection_is_parallel ()); + g_assert (collection_needs_workers ()); entry->func (data, entry->data); sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY); + + SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1); + return TRUE; } @@ -202,6 +269,7 @@ workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock) static gboolean workers_get_work (WorkerData *data) { + SgenMajorCollector *major; int i; g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); @@ -210,11 +278,7 @@ workers_get_work (WorkerData *data) if (workers_steal (data, data, TRUE)) return TRUE; - /* Then from the GC thread's stack. */ - if (workers_steal (data, &workers_gc_thread_data, TRUE)) - return TRUE; - - /* Finally, from another worker. */ + /* From another worker. */ for (i = 0; i < workers_num; ++i) { WorkerData *victim_data = &workers_data [i]; if (data == victim_data) @@ -223,6 +287,19 @@ workers_get_work (WorkerData *data) return TRUE; } + /* + * If we're concurrent or parallel, from the workers + * distribute gray queue. + */ + major = sgen_get_major_collector (); + if (major->is_concurrent || major->is_parallel) { + GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue); + if (section) { + sgen_gray_object_enqueue_section (&data->private_gray_queue, section); + return TRUE; + } + } + /* Nobody to steal from */ g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); return FALSE; @@ -239,7 +316,7 @@ workers_gray_queue_share_redirect (SgenGrayQueue *queue) * There are still objects in the stealable stack, so * wake up any workers that might be sleeping */ - if (workers_gc_in_progress) + if (workers_state.data.gc_in_progress) workers_wake_up_all (); return; } @@ -264,27 +341,43 @@ workers_gray_queue_share_redirect (SgenGrayQueue *queue) sgen_gray_object_free_queue_section (section); } - if (data != &workers_gc_thread_data && sgen_gray_object_queue_is_empty (queue)) + if (sgen_gray_object_queue_is_empty (queue)) workers_steal (data, data, FALSE); mono_mutex_unlock (&data->stealable_stack_mutex); - if (workers_gc_in_progress) + if (workers_state.data.gc_in_progress) workers_wake_up_all (); } +static void +concurrent_enqueue_check (char *obj) +{ + g_assert (sgen_concurrent_collection_in_progress ()); + g_assert (!sgen_ptr_in_nursery (obj)); + g_assert (SGEN_LOAD_VTABLE (obj)); +} + +static void +init_private_gray_queue (WorkerData *data) +{ + sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue, + sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL, + workers_gray_queue_share_redirect, data); +} + static mono_native_thread_return_t workers_thread_func (void *data_untyped) { WorkerData *data = data_untyped; + SgenMajorCollector *major = sgen_get_major_collector (); mono_thread_info_register_small_id (); - if (sgen_get_major_collector ()->init_worker_thread) - sgen_get_major_collector ()->init_worker_thread (data->major_collector_data); + if (major->init_worker_thread) + major->init_worker_thread (data->major_collector_data); - sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue, - workers_gray_queue_share_redirect, data); + init_private_gray_queue (data); for (;;) { gboolean did_work = FALSE; @@ -295,13 +388,18 @@ workers_thread_func (void *data_untyped) } if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) { + SgenObjectOperations *ops = sgen_concurrent_collection_in_progress () + ? &major->major_concurrent_ops + : &major->major_ops; + ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue }; + g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue)); - while (!sgen_drain_gray_stack (&data->private_gray_queue, 32)) + while (!sgen_drain_gray_stack (32, ctx)) workers_gray_queue_share_redirect (&data->private_gray_queue); g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue)); - sgen_gray_object_queue_init (&data->private_gray_queue); + init_private_gray_queue (data); did_work = TRUE; } @@ -314,22 +412,27 @@ workers_thread_func (void *data_untyped) return NULL; } -void -sgen_workers_distribute_gray_queue_sections (void) +static void +init_distribute_gray_queue (gboolean locked) { - if (!sgen_collection_is_parallel ()) + if (workers_distribute_gray_queue_inited) { + g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)); + g_assert (!workers_distribute_gray_queue.locked == !locked); return; + } - workers_gray_queue_share_redirect (&workers_distribute_gray_queue); + sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked, + sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL); + workers_distribute_gray_queue_inited = TRUE; } void sgen_workers_init_distribute_gray_queue (void) { - if (!sgen_collection_is_parallel ()) + if (!collection_needs_workers ()) return; - sgen_gray_object_queue_init (&workers_distribute_gray_queue); + init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel); } void @@ -337,30 +440,27 @@ sgen_workers_init (int num_workers) { int i; - if (!sgen_get_major_collector ()->is_parallel) + if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent) return; //g_print ("initing %d workers\n", num_workers); workers_num = num_workers; - workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA); + workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE); memset (workers_data, 0, sizeof (WorkerData) * num_workers); MONO_SEM_INIT (&workers_waiting_sem, 0); MONO_SEM_INIT (&workers_done_sem, 0); - sgen_gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue, - workers_gray_queue_share_redirect, &workers_gc_thread_data); - mono_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL); - workers_gc_thread_data.stealable_stack_fill = 0; + init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel); if (sgen_get_major_collector ()->alloc_worker_data) - workers_gc_thread_data.major_collector_data = sgen_get_major_collector ()->alloc_worker_data (); + workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data (); for (i = 0; i < workers_num; ++i) { /* private gray queue is inited by the thread itself */ - mono_mutex_init (&workers_data [i].stealable_stack_mutex, NULL); + mono_mutex_init (&workers_data [i].stealable_stack_mutex); workers_data [i].stealable_stack_fill = 0; if (sgen_get_major_collector ()->alloc_worker_data) @@ -391,39 +491,65 @@ workers_start_worker (int index) void sgen_workers_start_all_workers (void) { + State old_state, new_state; int i; - if (!sgen_collection_is_parallel ()) + if (!collection_needs_workers ()) return; if (sgen_get_major_collector ()->init_worker_thread) - sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_data.major_collector_data); + sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data); + + old_state = new_state = workers_state; + g_assert (!old_state.data.gc_in_progress); + new_state.data.gc_in_progress = TRUE; - g_assert (!workers_gc_in_progress); - workers_gc_in_progress = TRUE; workers_marking = FALSE; - workers_done_posted = 0; + + g_assert (workers_job_queue_num_entries == 0); + workers_num_jobs_enqueued = 0; + workers_num_jobs_finished = 0; if (workers_started) { - if (workers_num_waiting != workers_num) - g_error ("Expecting all %d sgen workers to be parked, but only %d are", workers_num, workers_num_waiting); + g_assert (old_state.data.done_posted); + if (old_state.data.num_waiting != workers_num) { + g_error ("Expecting all %d sgen workers to be parked, but only %d are", + workers_num, old_state.data.num_waiting); + } + + /* Clear the done posted flag */ + new_state.data.done_posted = 0; + if (!set_state (old_state, new_state)) + g_assert_not_reached (); + workers_wake_up_all (); return; } + g_assert (!old_state.data.done_posted); + + if (!set_state (old_state, new_state)) + g_assert_not_reached (); + for (i = 0; i < workers_num; ++i) workers_start_worker (i); workers_started = TRUE; } +gboolean +sgen_workers_have_started (void) +{ + return workers_state.data.gc_in_progress; +} + void sgen_workers_start_marking (void) { - if (!sgen_collection_is_parallel ()) + if (!collection_needs_workers ()) return; - g_assert (workers_started && workers_gc_in_progress); + g_assert (workers_started && workers_state.data.gc_in_progress); g_assert (!workers_marking); workers_marking = TRUE; @@ -434,28 +560,62 @@ sgen_workers_start_marking (void) void sgen_workers_join (void) { + State old_state, new_state; int i; - if (!sgen_collection_is_parallel ()) + if (!collection_needs_workers ()) return; - g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue)); - g_assert (sgen_gray_object_queue_is_empty (&workers_distribute_gray_queue)); + do { + old_state = new_state = workers_state; + g_assert (old_state.data.gc_in_progress); + g_assert (!old_state.data.done_posted); + + new_state.data.gc_in_progress = 0; + } while (!set_state (old_state, new_state)); - g_assert (workers_gc_in_progress); - workers_gc_in_progress = FALSE; - if (workers_num_waiting == workers_num) { + if (new_state.data.num_waiting == workers_num) { /* - * All the workers might have shut down at this point - * and posted the done semaphore but we don't know it - * yet. It's not a big deal to wake them up again - - * they'll just do one iteration of their loop trying to - * find something to do and then go back to waiting - * again. + * All the workers have shut down but haven't posted + * the done semaphore yet, or, if we come from below, + * haven't done all their work yet. + * + * It's not a big deal to wake them up again - they'll + * just do one iteration of their loop trying to find + * something to do and then go back to waiting again. */ + reawaken: workers_wake_up_all (); } MONO_SEM_WAIT (&workers_done_sem); + + old_state = new_state = workers_state; + g_assert (old_state.data.num_waiting == workers_num); + g_assert (old_state.data.done_posted); + + if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) { + /* + * There's a small race condition that we avoid here. + * It's possible that a worker thread runs out of + * things to do, so it goes to sleep. Right at that + * moment a new job is enqueued, but the thread is + * still registered as running. Now the threads are + * joined, and we wait for the semaphore. Only at + * this point does the worker go to sleep, and posts + * the semaphore, because workers_gc_in_progress is + * already FALSE. The job is still in the queue, + * though. + * + * Clear the done posted flag. + */ + new_state.data.done_posted = 0; + if (!set_state (old_state, new_state)) + g_assert_not_reached (); + goto reawaken; + } + + /* At this point all the workers have stopped. */ + workers_marking = FALSE; if (sgen_get_major_collector ()->reset_worker_data) { @@ -463,16 +623,27 @@ sgen_workers_join (void) sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data); } - g_assert (workers_done_posted); - - g_assert (!workers_gc_thread_data.stealable_stack_fill); - g_assert (sgen_gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue)); + g_assert (workers_job_queue_num_entries == 0); + g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)); for (i = 0; i < workers_num; ++i) { g_assert (!workers_data [i].stealable_stack_fill); g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue)); } } +gboolean +sgen_workers_all_done (void) +{ + State state = workers_state; + /* + * Can only be called while the collection is still in + * progress, i.e., before done has been posted. + */ + g_assert (state.data.gc_in_progress); + g_assert (!state.data.done_posted); + return state.data.num_waiting == workers_num; +} + gboolean sgen_is_worker_thread (MonoNativeThreadId thread) { @@ -488,14 +659,8 @@ sgen_is_worker_thread (MonoNativeThreadId thread) return FALSE; } -gboolean -sgen_workers_is_distributed_queue (SgenGrayQueue *queue) -{ - return queue == &workers_distribute_gray_queue; -} - -SgenGrayQueue* -sgen_workers_get_distribute_gray_queue (void) +SgenSectionGrayQueue* +sgen_workers_get_distribute_section_gray_queue (void) { return &workers_distribute_gray_queue; } @@ -504,7 +669,6 @@ void sgen_workers_reset_data (void) { if (sgen_get_major_collector ()->reset_worker_data) - sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_data.major_collector_data); - + sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data); } #endif