-/*
- * threadpool.c: Microsoft threadpool runtime support
+/**
+ * \file
+ * Microsoft threadpool runtime support
*
* Author:
* Ludovic Henry (ludovic.henry@xamarin.com)
/* Number of currently executing jobs */
gint32 threadpool_jobs;
/* Signalled when threadpool_jobs + outstanding_request is 0 */
- /* Protected by threadpool->domains_lock */
+ /* Protected by threadpool.domains_lock */
MonoCoopCond cleanup_cond;
} ThreadPoolDomain;
GPtrArray *domains; // ThreadPoolDomain* []
MonoCoopMutex domains_lock;
- GPtrArray *threads; // MonoInternalThread* []
- MonoCoopMutex threads_lock;
- MonoCoopCond threads_exit_cond;
-
ThreadPoolCounter counters;
gint32 limit_io_min;
gint32 limit_io_max;
-
- MonoThreadPoolWorker *worker;
} ThreadPool;
static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
-static ThreadPool* threadpool;
+static ThreadPool threadpool;
-#define COUNTER_ATOMIC(threadpool,var,block) \
+#define COUNTER_ATOMIC(var,block) \
do { \
ThreadPoolCounter __old; \
do { \
- g_assert (threadpool); \
- (var) = __old = COUNTER_READ (threadpool); \
+ (var) = __old = COUNTER_READ (); \
{ block; } \
if (!(counter._.starting >= 0)) \
g_error ("%s: counter._.starting = %d, but should be >= 0", __func__, counter._.starting); \
if (!(counter._.working >= 0)) \
g_error ("%s: counter._.working = %d, but should be >= 0", __func__, counter._.working); \
- } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
+ } while (InterlockedCompareExchange (&threadpool.counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
} while (0)
static inline ThreadPoolCounter
-COUNTER_READ (ThreadPool *threadpool)
+COUNTER_READ (void)
{
ThreadPoolCounter counter;
- counter.as_gint32 = InterlockedRead (&threadpool->counters.as_gint32);
+ counter.as_gint32 = InterlockedRead (&threadpool.counters.as_gint32);
return counter;
}
static inline void
domains_lock (void)
{
- mono_coop_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_lock (&threadpool.domains_lock);
}
static inline void
domains_unlock (void)
{
- mono_coop_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool.domains_lock);
}
static void
destroy (gpointer unused)
{
- g_ptr_array_free (threadpool->domains, TRUE);
- mono_coop_mutex_destroy (&threadpool->domains_lock);
-
- g_ptr_array_free (threadpool->threads, TRUE);
- mono_coop_mutex_destroy (&threadpool->threads_lock);
- mono_coop_cond_destroy (&threadpool->threads_exit_cond);
-
- /* We cannot free the threadpool, because there is a race
- * on shutdown where a managed thread may request a new
- * threadpool thread, but we already destroyed the
- * threadpool. So to avoid a use-after-free, we simply do
- * not free the threadpool, as we won't be able to access
- * the threadpool anyway because the ref count will be 0 */
- // g_free (threadpool);
+ g_ptr_array_free (threadpool.domains, TRUE);
+ mono_coop_mutex_destroy (&threadpool.domains_lock);
}
+static void
+worker_callback (void);
+
static void
initialize (void)
{
- g_assert (!threadpool);
- threadpool = g_new0 (ThreadPool, 1);
- g_assert (threadpool);
-
g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32));
- mono_refcount_init (threadpool, destroy);
-
- threadpool->domains = g_ptr_array_new ();
- mono_coop_mutex_init (&threadpool->domains_lock);
+ mono_refcount_init (&threadpool, destroy);
- threadpool->threads = g_ptr_array_new ();
- mono_coop_mutex_init (&threadpool->threads_lock);
- mono_coop_cond_init (&threadpool->threads_exit_cond);
+ threadpool.domains = g_ptr_array_new ();
+ mono_coop_mutex_init (&threadpool.domains_lock);
- threadpool->limit_io_min = mono_cpu_count ();
- threadpool->limit_io_max = CLAMP (threadpool->limit_io_min * 100, MIN (threadpool->limit_io_min, 200), MAX (threadpool->limit_io_min, 200));
+ threadpool.limit_io_min = mono_cpu_count ();
+ threadpool.limit_io_max = CLAMP (threadpool.limit_io_min * 100, MIN (threadpool.limit_io_min, 200), MAX (threadpool.limit_io_min, 200));
- mono_threadpool_worker_init (&threadpool->worker);
+ mono_threadpool_worker_init (worker_callback);
}
static void
cleanup (void)
{
- guint i;
- MonoInternalThread *current;
-
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- g_assert (mono_runtime_is_shutting_down ());
-
- current = mono_thread_internal_current ();
-
- mono_coop_mutex_lock (&threadpool->threads_lock);
-
- /* stop all threadpool->threads */
- for (i = 0; i < threadpool->threads->len; ++i) {
- MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool->threads, i);
- if (thread != current)
- mono_thread_internal_abort (thread);
- }
-
- mono_coop_mutex_unlock (&threadpool->threads_lock);
-
-#if 0
- /* give a chance to the other threads to exit */
- mono_thread_info_yield ();
-
- mono_coop_mutex_lock (&threadpool->threads_lock);
-
- for (;;) {
- if (threadpool->threads->len == 0)
- break;
-
- if (threadpool->threads->len == 1 && g_ptr_array_index (threadpool->threads, 0) == current) {
- /* We are waiting on ourselves */
- break;
- }
+ mono_threadpool_worker_cleanup ();
- mono_coop_cond_wait (&threadpool->threads_exit_cond, &threadpool->threads_lock);
- }
-
- mono_coop_mutex_unlock (&threadpool->threads_lock);
-#endif
-
- mono_threadpool_worker_cleanup (threadpool->worker);
-
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
}
gboolean
MonoBoolean f;
gpointer args [2];
- mono_error_init (error);
+ error_init (error);
g_assert (work_item);
if (!threadpool_class)
return TRUE;
}
-/* LOCKING: domains_lock must be held */
-static void
-tpdomain_add (ThreadPoolDomain *tpdomain)
+/* LOCKING: domains_lock must be held. */
+static ThreadPoolDomain *
+tpdomain_create (MonoDomain *domain)
{
- guint i, len;
+ ThreadPoolDomain *tpdomain;
- g_assert (tpdomain);
+ tpdomain = g_new0 (ThreadPoolDomain, 1);
+ tpdomain->domain = domain;
+ mono_coop_cond_init (&tpdomain->cleanup_cond);
- len = threadpool->domains->len;
- for (i = 0; i < len; ++i) {
- if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
- break;
- }
+ g_ptr_array_add (threadpool.domains, tpdomain);
- if (i == len)
- g_ptr_array_add (threadpool->domains, tpdomain);
+ return tpdomain;
}
/* LOCKING: domains_lock must be held. */
tpdomain_remove (ThreadPoolDomain *tpdomain)
{
g_assert (tpdomain);
- return g_ptr_array_remove (threadpool->domains, tpdomain);
+ return g_ptr_array_remove (threadpool.domains, tpdomain);
}
/* LOCKING: domains_lock must be held */
static ThreadPoolDomain *
-tpdomain_get (MonoDomain *domain, gboolean create)
+tpdomain_get (MonoDomain *domain)
{
- guint i;
- ThreadPoolDomain *tpdomain;
+ gint i;
g_assert (domain);
- for (i = 0; i < threadpool->domains->len; ++i) {
+ for (i = 0; i < threadpool.domains->len; ++i) {
ThreadPoolDomain *tpdomain;
- tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
+ tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool.domains, i);
if (tpdomain->domain == domain)
return tpdomain;
}
- if (!create)
- return NULL;
-
- tpdomain = g_new0 (ThreadPoolDomain, 1);
- tpdomain->domain = domain;
- mono_coop_cond_init (&tpdomain->cleanup_cond);
-
- tpdomain_add (tpdomain);
-
- return tpdomain;
+ return NULL;
}
static void
tpdomain_get_next (ThreadPoolDomain *current)
{
ThreadPoolDomain *tpdomain = NULL;
- guint len;
+ gint len;
- len = threadpool->domains->len;
+ len = threadpool.domains->len;
if (len > 0) {
- guint i, current_idx = -1;
+ gint i, current_idx = -1;
if (current) {
for (i = 0; i < len; ++i) {
- if (current == g_ptr_array_index (threadpool->domains, i)) {
+ if (current == g_ptr_array_index (threadpool.domains, i)) {
current_idx = i;
break;
}
}
- g_assert (current_idx != (guint)-1);
}
for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
- ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
+ ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool.domains, i % len);
if (tmp->outstanding_request > 0) {
tpdomain = tmp;
break;
try_invoke_perform_wait_callback (MonoObject** exc, MonoError *error)
{
HANDLE_FUNCTION_ENTER ();
- mono_error_init (error);
+ error_init (error);
MonoObject *res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, exc, error);
HANDLE_FUNCTION_RETURN_VAL (res);
}
static void
-worker_callback (gpointer unused)
+worker_callback (void)
{
MonoError error;
ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
MonoInternalThread *thread;
+ if (!mono_refcount_tryinc (&threadpool))
+ return;
+
thread = mono_thread_internal_current ();
- COUNTER_ATOMIC (threadpool, counter, {
+ COUNTER_ATOMIC (counter, {
if (!(counter._.working < 32767 /* G_MAXINT16 */))
g_error ("%s: counter._.working = %d, but should be < 32767", __func__, counter._.working);
});
if (mono_runtime_is_shutting_down ()) {
- COUNTER_ATOMIC (threadpool, counter, {
+ COUNTER_ATOMIC (counter, {
counter._.working --;
});
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
return;
}
- mono_coop_mutex_lock (&threadpool->threads_lock);
- g_ptr_array_add (threadpool->threads, thread);
- mono_coop_mutex_unlock (&threadpool->threads_lock);
-
/*
* This is needed so there is always an lmf frame in the runtime invoke call below,
* so ThreadAbortExceptions are caught even if the thread is in native code.
while (!mono_runtime_is_shutting_down ()) {
gboolean retire = FALSE;
- if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
+ if (thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) {
domains_unlock ();
- mono_thread_interruption_checkpoint ();
+ if (mono_thread_interruption_checkpoint ()) {
+ domains_lock ();
+ continue;
+ }
domains_lock ();
}
g_assert (tpdomain->outstanding_request >= 0);
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d)",
- mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
+ GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), tpdomain->domain, tpdomain->outstanding_request);
g_assert (tpdomain->threadpool_jobs >= 0);
tpdomain->threadpool_jobs ++;
domains_unlock ();
- mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, TRUE, &error);
+ MonoString *thread_name = mono_string_new_checked (mono_get_root_domain (), "Threadpool worker", &error);
+ mono_error_assert_ok (&error);
+ mono_thread_set_name_internal (thread, thread_name, FALSE, TRUE, &error);
mono_error_assert_ok (&error);
mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
domains_unlock ();
- mono_coop_mutex_lock (&threadpool->threads_lock);
-
- g_ptr_array_remove_fast (threadpool->threads, thread);
-
- mono_coop_cond_signal (&threadpool->threads_exit_cond);
-
- mono_coop_mutex_unlock (&threadpool->threads_lock);
-
- COUNTER_ATOMIC (threadpool, counter, {
+ COUNTER_ATOMIC (counter, {
counter._.working --;
});
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
}
void
if (!async_call_klass)
async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
- mono_lazy_initialize (&status, initialize);
-
- mono_error_init (error);
+ error_init (error);
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error);
return_val_if_nok (error, NULL);
{
MonoAsyncCall *ac;
- mono_error_init (error);
+ error_init (error);
g_assert (exc);
g_assert (out_args);
if (!mono_lazy_is_initialized (&status))
return TRUE;
- mono_refcount_inc (threadpool);
+ mono_refcount_inc (&threadpool);
domains_lock ();
- tpdomain = tpdomain_get (domain, FALSE);
+ tpdomain = tpdomain_get (domain);
if (!tpdomain) {
domains_unlock ();
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
return TRUE;
}
while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
if (timeout == -1) {
- mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
+ mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool.domains_lock);
} else {
gint64 now;
gint res;
break;
}
- res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
+ res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool.domains_lock, end - now);
if (res != 0) {
ret = FALSE;
break;
mono_coop_cond_destroy (&tpdomain->cleanup_cond);
tpdomain_free (tpdomain);
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
return ret;
}
void
mono_threadpool_suspend (void)
{
- if (threadpool)
- mono_threadpool_worker_set_suspended (threadpool->worker, TRUE);
+ if (mono_lazy_is_initialized (&status))
+ mono_threadpool_worker_set_suspended (TRUE);
}
void
mono_threadpool_resume (void)
{
- if (threadpool)
- mono_threadpool_worker_set_suspended (threadpool->worker, FALSE);
+ if (mono_lazy_is_initialized (&status))
+ mono_threadpool_worker_set_suspended (FALSE);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
- counter = COUNTER_READ (threadpool);
+ counter = COUNTER_READ ();
- *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool->worker) - counter._.working);
- *completion_port_threads = threadpool->limit_io_max;
+ *worker_threads = MAX (0, mono_threadpool_worker_get_max () - counter._.working);
+ *completion_port_threads = threadpool.limit_io_max;
+
+ mono_refcount_dec (&threadpool);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
+
+ *worker_threads = mono_threadpool_worker_get_min ();
+ *completion_port_threads = threadpool.limit_io_min;
- *worker_threads = mono_threadpool_worker_get_min (threadpool->worker);
- *completion_port_threads = threadpool->limit_io_min;
+ mono_refcount_dec (&threadpool);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
+
+ *worker_threads = mono_threadpool_worker_get_max ();
+ *completion_port_threads = threadpool.limit_io_max;
- *worker_threads = mono_threadpool_worker_get_max (threadpool->worker);
- *completion_port_threads = threadpool->limit_io_max;
+ mono_refcount_dec (&threadpool);
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
- mono_lazy_initialize (&status, initialize);
+ if (completion_port_threads <= 0 || completion_port_threads > threadpool.limit_io_max)
+ return FALSE;
- if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool))
return FALSE;
- if (!mono_threadpool_worker_set_min (threadpool->worker, worker_threads))
+ if (!mono_threadpool_worker_set_min (worker_threads)) {
+ mono_refcount_dec (&threadpool);
return FALSE;
+ }
- threadpool->limit_io_min = completion_port_threads;
+ threadpool.limit_io_min = completion_port_threads;
+ mono_refcount_dec (&threadpool);
return TRUE;
}
{
gint cpu_count = mono_cpu_count ();
- mono_lazy_initialize (&status, initialize);
+ if (completion_port_threads < threadpool.limit_io_min || completion_port_threads < cpu_count)
+ return FALSE;
- if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool))
return FALSE;
- if (!mono_threadpool_worker_set_max (threadpool->worker, worker_threads))
+ if (!mono_threadpool_worker_set_max (worker_threads)) {
+ mono_refcount_dec (&threadpool);
return FALSE;
+ }
- threadpool->limit_io_max = completion_port_threads;
+ threadpool.limit_io_max = completion_port_threads;
+ mono_refcount_dec (&threadpool);
return TRUE;
}
if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
return FALSE;
- return mono_threadpool_worker_notify_completed (threadpool->worker);
+ return mono_threadpool_worker_notify_completed ();
}
void
ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
{
- mono_threadpool_worker_notify_completed (threadpool->worker);
+ mono_threadpool_worker_notify_completed ();
}
void
if (mono_domain_is_unloading (domain))
return FALSE;
- if (!mono_refcount_tryinc (threadpool)) {
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
/* threadpool has been destroyed, we are shutting down */
return FALSE;
}
domains_lock ();
- /* synchronize with mono_threadpool_remove_domain_jobs */
- if (mono_domain_is_unloading (domain)) {
- domains_unlock ();
- mono_refcount_dec (threadpool);
- return FALSE;
+ tpdomain = tpdomain_get (domain);
+ if (!tpdomain) {
+ /* synchronize with mono_threadpool_remove_domain_jobs */
+ if (mono_domain_is_unloading (domain)) {
+ domains_unlock ();
+ mono_refcount_dec (&threadpool);
+ return FALSE;
+ }
+
+ tpdomain = tpdomain_create (domain);
}
- tpdomain = tpdomain_get (domain, TRUE);
g_assert (tpdomain);
tpdomain->outstanding_request ++;
domains_unlock ();
- COUNTER_ATOMIC (threadpool, counter, {
+ COUNTER_ATOMIC (counter, {
if (counter._.starting == 16) {
- mono_refcount_dec (threadpool);
+ mono_refcount_dec (&threadpool);
return TRUE;
}
counter._.starting ++;
});
- mono_threadpool_worker_enqueue (threadpool->worker, worker_callback, NULL);
+ mono_threadpool_worker_request ();
+ mono_refcount_dec (&threadpool);
return TRUE;
}