typedef struct {
MonoDomain *domain;
+ /* Number of outstanding jobs */
gint32 outstanding_request;
+ /* Number of currently executing jobs */
+ int threadpool_jobs;
+ /* Signalled when threadpool_jobs + outstanding_request is 0 */
+ /* Protected by threadpool->domains_lock */
+ MonoCoopCond cleanup_cond;
} ThreadPoolDomain;
typedef MonoInternalThread ThreadPoolWorkingThread;
gboolean suspended;
} ThreadPool;
-typedef struct {
- gint32 ref;
- MonoCoopSem sem;
-} ThreadPoolDomainCleanupSemaphore;
-
typedef enum {
TRANSITION_WARMUP,
TRANSITION_INITIALIZING,
} while (0); \
} while (0)
+static inline void
+domains_lock (void)
+{
+ mono_coop_mutex_lock (&threadpool->domains_lock);
+}
+
+static inline void
+domains_unlock (void)
+{
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
+}
+
static gpointer
rand_create (void)
{
return TRUE;
}
-/* LOCKING: threadpool->domains_lock must be held */
+/* LOCKING: domains_lock must be held */
static void
-domain_add (ThreadPoolDomain *tpdomain)
+tpdomain_add (ThreadPoolDomain *tpdomain)
{
guint i, len;
g_ptr_array_add (threadpool->domains, tpdomain);
}
-/* LOCKING: threadpool->domains_lock must be held */
+/* LOCKING: domains_lock must be held. */
static gboolean
-domain_remove (ThreadPoolDomain *tpdomain)
+tpdomain_remove (ThreadPoolDomain *tpdomain)
{
g_assert (tpdomain);
return g_ptr_array_remove (threadpool->domains, tpdomain);
}
-/* LOCKING: threadpool->domains_lock must be held */
+/* LOCKING: domains_lock must be held */
static ThreadPoolDomain *
-domain_get (MonoDomain *domain, gboolean create)
+tpdomain_get (MonoDomain *domain, gboolean create)
{
- ThreadPoolDomain *tpdomain = NULL;
guint i;
+ ThreadPoolDomain *tpdomain;
g_assert (domain);
for (i = 0; i < threadpool->domains->len; ++i) {
+ ThreadPoolDomain *tpdomain;
+
tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
if (tpdomain->domain == domain)
return tpdomain;
}
- if (create) {
- ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
- cleanup_semaphore = g_new0 (ThreadPoolDomainCleanupSemaphore, 1);
- cleanup_semaphore->ref = 2;
- mono_coop_sem_init (&cleanup_semaphore->sem, 0);
+ if (!create)
+ return NULL;
- g_assert(!domain->cleanup_semaphore);
- domain->cleanup_semaphore = cleanup_semaphore;
+ tpdomain = g_new0 (ThreadPoolDomain, 1);
+ tpdomain->domain = domain;
+ mono_coop_cond_init (&tpdomain->cleanup_cond);
- tpdomain = g_new0 (ThreadPoolDomain, 1);
- tpdomain->domain = domain;
- domain_add (tpdomain);
- }
+ tpdomain_add (tpdomain);
return tpdomain;
}
static void
-domain_free (ThreadPoolDomain *tpdomain)
+tpdomain_free (ThreadPoolDomain *tpdomain)
{
g_free (tpdomain);
}
-/* LOCKING: threadpool->domains_lock must be held */
+/* LOCKING: domains_lock must be held */
static gboolean
domain_any_has_request (void)
{
return FALSE;
}
-/* LOCKING: threadpool->domains_lock must be held */
+/* LOCKING: domains_lock must be held */
static ThreadPoolDomain *
-domain_get_next (ThreadPoolDomain *current)
+tpdomain_get_next (ThreadPoolDomain *current)
{
ThreadPoolDomain *tpdomain = NULL;
guint len;
if (thread == mono_thread_internal_current ())
return;
- mono_thread_internal_stop ((MonoInternalThread*) thread);
+ mono_thread_internal_abort ((MonoInternalThread*) thread);
}
static void
previous_tpdomain = NULL;
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
while (!mono_runtime_is_shutting_down ()) {
tpdomain = NULL;
- if ((thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) {
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
+ domains_unlock ();
mono_thread_interruption_checkpoint ();
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
}
- if (retire || !(tpdomain = domain_get_next (previous_tpdomain))) {
+ if (retire || !(tpdomain = tpdomain_get_next (previous_tpdomain))) {
gboolean timeout;
COUNTER_ATOMIC (counter, {
counter._.parked ++;
});
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
timeout = worker_park ();
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
COUNTER_ATOMIC (counter, {
counter._.working ++;
tpdomain->outstanding_request --;
g_assert (tpdomain->outstanding_request >= 0);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ",
mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
g_assert (tpdomain->domain);
- g_assert (tpdomain->domain->threadpool_jobs >= 0);
- tpdomain->domain->threadpool_jobs ++;
+ g_assert (tpdomain->threadpool_jobs >= 0);
+ tpdomain->threadpool_jobs ++;
+
+ /*
+ * This is needed so there is always an lmf frame in the runtime invoke call below,
+ * so ThreadAbortExceptions are caught even if the thread is in native code.
+ */
+ mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE;
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
mono_thread_push_appdomain_ref (tpdomain->domain);
if (mono_domain_set (tpdomain->domain, FALSE)) {
}
mono_thread_pop_appdomain_ref ();
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
- tpdomain->domain->threadpool_jobs --;
- g_assert (tpdomain->domain->threadpool_jobs >= 0);
+ tpdomain->threadpool_jobs --;
+ g_assert (tpdomain->threadpool_jobs >= 0);
- if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
- ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
+ if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
gboolean removed;
- removed = domain_remove(tpdomain);
+ removed = tpdomain_remove (tpdomain);
g_assert (removed);
- cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) tpdomain->domain->cleanup_semaphore;
-
- g_assert(cleanup_semaphore);
- mono_coop_sem_post (&cleanup_semaphore->sem);
- if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) {
- mono_coop_sem_destroy (&cleanup_semaphore->sem);
- g_free (cleanup_semaphore);
- tpdomain->domain->cleanup_semaphore = NULL;
- }
-
- domain_free (tpdomain);
+ mono_coop_cond_signal (&tpdomain->cleanup_cond);
tpdomain = NULL;
}
previous_tpdomain = tpdomain;
}
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
mono_coop_mutex_lock (&threadpool->active_threads_lock);
g_ptr_array_remove_fast (threadpool->working_threads, thread);
if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) {
threadpool->worker_creation_current_count += 1;
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), thread->tid, now, threadpool->worker_creation_current_count);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count);
mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
return TRUE;
}
if (mono_runtime_is_shutting_down ())
return FALSE;
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
/* synchronize check with worker_thread */
if (mono_domain_is_unloading (domain)) {
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
return FALSE;
}
- tpdomain = domain_get (domain, TRUE);
+ tpdomain = tpdomain_get (domain, TRUE);
g_assert (tpdomain);
tpdomain->outstanding_request ++;
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
if (threadpool->suspended)
return FALSE;
if (mono_runtime_is_shutting_down ()) {
should_keep_running = FALSE;
} else {
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
if (!domain_any_has_request ())
should_keep_running = FALSE;
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
if (!should_keep_running) {
if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
if (mono_runtime_is_shutting_down ())
continue;
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ domains_lock ();
if (!domain_any_has_request ()) {
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
continue;
}
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ domains_unlock ();
threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
void
mono_threadpool_ms_cleanup (void)
{
- #ifndef DISABLE_SOCKETS
- mono_threadpool_ms_io_cleanup ();
- #endif
+#ifndef DISABLE_SOCKETS
+ mono_threadpool_ms_io_cleanup ();
+#endif
mono_lazy_cleanup (&status, cleanup);
}
}
mono_monitor_exit ((MonoObject*) ares);
MONO_ENTER_GC_SAFE;
+#ifdef HOST_WIN32
WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
+#else
+ mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE);
+#endif
MONO_EXIT_GC_SAFE;
}
gboolean
mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
{
- gint res;
- gint64 now, end;
+ gint64 end;
ThreadPoolDomain *tpdomain;
- ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
+ gboolean ret;
g_assert (domain);
g_assert (timeout >= -1);
#endif
/*
- * There might be some threads out that could be about to execute stuff from the given domain.
- * We avoid that by waiting on a semaphore to be pulsed by the thread that reaches zero.
- * The semaphore is only created for domains which queued threadpool jobs.
- * We always wait on the semaphore rather than ensuring domain->threadpool_jobs is 0.
- * There may be pending outstanding requests which will create new jobs.
- * The semaphore is signaled the threadpool domain has been removed from list
- * and we know no more jobs for the domain will be processed.
- */
-
- mono_lazy_initialize(&status, initialize);
- mono_coop_mutex_lock(&threadpool->domains_lock);
-
- tpdomain = domain_get (domain, FALSE);
- if (!tpdomain || tpdomain->outstanding_request == 0) {
- mono_coop_mutex_unlock(&threadpool->domains_lock);
+ * Wait for all threads which execute jobs in the domain to exit.
+ * The is_unloading () check in worker_request () ensures that
+ * no new jobs are added after we enter the lock below.
+ */
+ mono_lazy_initialize (&status, initialize);
+ domains_lock ();
+
+ tpdomain = tpdomain_get (domain, FALSE);
+ if (!tpdomain) {
+ domains_unlock ();
return TRUE;
}
- mono_coop_mutex_unlock(&threadpool->domains_lock);
+ ret = TRUE;
- g_assert (domain->cleanup_semaphore);
+ while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
+ if (timeout == -1) {
+ mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
+ } else {
+ gint64 now;
+ gint res;
- cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) domain->cleanup_semaphore;
+ now = mono_msec_ticks();
+ if (now > end) {
+ ret = FALSE;
+ break;
+ }
- if (timeout == -1) {
- res = mono_coop_sem_wait (&cleanup_semaphore->sem, MONO_SEM_FLAGS_NONE);
- g_assert (res == MONO_SEM_TIMEDWAIT_RET_SUCCESS);
- } else {
- now = mono_msec_ticks();
- if (now > end)
- return FALSE;
- res = mono_coop_sem_timedwait (&cleanup_semaphore->sem, end - now, MONO_SEM_FLAGS_NONE);
+ res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
+ if (res != 0) {
+ ret = FALSE;
+ break;
+ }
+ }
}
- if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) {
- mono_coop_sem_destroy (&cleanup_semaphore->sem);
- g_free (cleanup_semaphore);
- domain->cleanup_semaphore = NULL;
- }
+ /* Remove from the list the worker threads look at */
+ tpdomain_remove (tpdomain);
+
+ domains_unlock ();
+
+ mono_coop_cond_destroy (&tpdomain->cleanup_cond);
+ tpdomain_free (tpdomain);
- return res == MONO_SEM_TIMEDWAIT_RET_SUCCESS;
+ return ret;
}
void