-/*
- * threadpool.c: Microsoft threadpool runtime support
+/**
+ * \file
+ * Microsoft threadpool runtime support
*
* Author:
* Ludovic Henry (ludovic.henry@xamarin.com)
GPtrArray *domains; // ThreadPoolDomain* []
MonoCoopMutex domains_lock;
- GPtrArray *threads; // MonoInternalThread* []
- MonoCoopMutex threads_lock;
- MonoCoopCond threads_exit_cond;
-
ThreadPoolCounter counters;
gint32 limit_io_min;
gint32 limit_io_max;
-
- MonoThreadPoolWorker *worker;
} ThreadPool;
static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
static void
destroy (gpointer unused)
{
-#if 0
g_ptr_array_free (threadpool.domains, TRUE);
mono_coop_mutex_destroy (&threadpool.domains_lock);
-
- g_ptr_array_free (threadpool.threads, TRUE);
- mono_coop_mutex_destroy (&threadpool.threads_lock);
- mono_coop_cond_destroy (&threadpool.threads_exit_cond);
-#endif
}
+static void
+worker_callback (void);
+
static void
initialize (void)
{
threadpool.domains = g_ptr_array_new ();
mono_coop_mutex_init (&threadpool.domains_lock);
- threadpool.threads = g_ptr_array_new ();
- mono_coop_mutex_init (&threadpool.threads_lock);
- mono_coop_cond_init (&threadpool.threads_exit_cond);
-
threadpool.limit_io_min = mono_cpu_count ();
threadpool.limit_io_max = CLAMP (threadpool.limit_io_min * 100, MIN (threadpool.limit_io_min, 200), MAX (threadpool.limit_io_min, 200));
- mono_threadpool_worker_init (&threadpool.worker);
+ mono_threadpool_worker_init (worker_callback);
}
static void
cleanup (void)
{
- guint i;
- MonoInternalThread *current;
-
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- g_assert (mono_runtime_is_shutting_down ());
-
- current = mono_thread_internal_current ();
-
- mono_coop_mutex_lock (&threadpool.threads_lock);
-
- /* stop all threadpool.threads */
- for (i = 0; i < threadpool.threads->len; ++i) {
- MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool.threads, i);
- if (thread != current)
- mono_thread_internal_abort (thread);
- }
-
- mono_coop_mutex_unlock (&threadpool.threads_lock);
-
-#if 0
- /* give a chance to the other threads to exit */
- mono_thread_info_yield ();
-
- mono_coop_mutex_lock (&threadpool.threads_lock);
-
- for (;;) {
- if (threadpool.threads->len == 0)
- break;
-
- if (threadpool.threads->len == 1 && g_ptr_array_index (threadpool.threads, 0) == current) {
- /* We are waiting on ourselves */
- break;
- }
-
- mono_coop_cond_wait (&threadpool.threads_exit_cond, &threadpool.threads_lock);
- }
-
- mono_coop_mutex_unlock (&threadpool.threads_lock);
-#endif
-
- mono_threadpool_worker_cleanup (threadpool.worker);
+ mono_threadpool_worker_cleanup ();
mono_refcount_dec (&threadpool);
}
MonoBoolean f;
gpointer args [2];
- mono_error_init (error);
+ error_init (error);
g_assert (work_item);
if (!threadpool_class)
return TRUE;
}
-/* LOCKING: domains_lock must be held */
-static void
-tpdomain_add (ThreadPoolDomain *tpdomain)
+/* LOCKING: domains_lock must be held. */
+static ThreadPoolDomain *
+tpdomain_create (MonoDomain *domain)
{
- guint i, len;
+ ThreadPoolDomain *tpdomain;
- g_assert (tpdomain);
+ tpdomain = g_new0 (ThreadPoolDomain, 1);
+ tpdomain->domain = domain;
+ mono_coop_cond_init (&tpdomain->cleanup_cond);
- len = threadpool.domains->len;
- for (i = 0; i < len; ++i) {
- if (g_ptr_array_index (threadpool.domains, i) == tpdomain)
- break;
- }
+ g_ptr_array_add (threadpool.domains, tpdomain);
- if (i == len)
- g_ptr_array_add (threadpool.domains, tpdomain);
+ return tpdomain;
}
/* LOCKING: domains_lock must be held. */
/* LOCKING: domains_lock must be held */
static ThreadPoolDomain *
-tpdomain_get (MonoDomain *domain, gboolean create)
+tpdomain_get (MonoDomain *domain)
{
- guint i;
- ThreadPoolDomain *tpdomain;
+ gint i;
g_assert (domain);
return tpdomain;
}
- if (!create)
- return NULL;
-
- tpdomain = g_new0 (ThreadPoolDomain, 1);
- tpdomain->domain = domain;
- mono_coop_cond_init (&tpdomain->cleanup_cond);
-
- tpdomain_add (tpdomain);
-
- return tpdomain;
+ return NULL;
}
static void
tpdomain_get_next (ThreadPoolDomain *current)
{
ThreadPoolDomain *tpdomain = NULL;
- guint len;
+ gint len;
len = threadpool.domains->len;
if (len > 0) {
- guint i, current_idx = -1;
+ gint i, current_idx = -1;
if (current) {
for (i = 0; i < len; ++i) {
if (current == g_ptr_array_index (threadpool.domains, i)) {
break;
}
}
- g_assert (current_idx != (guint)-1);
}
for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool.domains, i % len);
try_invoke_perform_wait_callback (MonoObject** exc, MonoError *error)
{
HANDLE_FUNCTION_ENTER ();
- mono_error_init (error);
+ error_init (error);
MonoObject *res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, exc, error);
HANDLE_FUNCTION_RETURN_VAL (res);
}
static void
-worker_callback (gpointer unused)
+worker_callback (void)
{
MonoError error;
ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
MonoInternalThread *thread;
+ if (!mono_refcount_tryinc (&threadpool))
+ return;
+
thread = mono_thread_internal_current ();
COUNTER_ATOMIC (counter, {
return;
}
- mono_coop_mutex_lock (&threadpool.threads_lock);
- g_ptr_array_add (threadpool.threads, thread);
- mono_coop_mutex_unlock (&threadpool.threads_lock);
-
/*
* This is needed so there is always an lmf frame in the runtime invoke call below,
* so ThreadAbortExceptions are caught even if the thread is in native code.
while (!mono_runtime_is_shutting_down ()) {
gboolean retire = FALSE;
- if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
+ if (thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) {
domains_unlock ();
- mono_thread_interruption_checkpoint ();
+ if (mono_thread_interruption_checkpoint ()) {
+ domains_lock ();
+ continue;
+ }
domains_lock ();
}
domains_unlock ();
- mono_coop_mutex_lock (&threadpool.threads_lock);
-
- g_ptr_array_remove_fast (threadpool.threads, thread);
-
- mono_coop_cond_signal (&threadpool.threads_exit_cond);
-
- mono_coop_mutex_unlock (&threadpool.threads_lock);
-
COUNTER_ATOMIC (counter, {
counter._.working --;
});
if (!async_call_klass)
async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
- mono_lazy_initialize (&status, initialize);
-
- mono_error_init (error);
+ error_init (error);
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error);
return_val_if_nok (error, NULL);
{
MonoAsyncCall *ac;
- mono_error_init (error);
+ error_init (error);
g_assert (exc);
g_assert (out_args);
domains_lock ();
- tpdomain = tpdomain_get (domain, FALSE);
+ tpdomain = tpdomain_get (domain);
if (!tpdomain) {
domains_unlock ();
mono_refcount_dec (&threadpool);
mono_threadpool_suspend (void)
{
if (mono_lazy_is_initialized (&status))
- mono_threadpool_worker_set_suspended (threadpool.worker, TRUE);
+ mono_threadpool_worker_set_suspended (TRUE);
}
void
mono_threadpool_resume (void)
{
if (mono_lazy_is_initialized (&status))
- mono_threadpool_worker_set_suspended (threadpool.worker, FALSE);
+ mono_threadpool_worker_set_suspended (FALSE);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
counter = COUNTER_READ ();
- *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool.worker) - counter._.working);
+ *worker_threads = MAX (0, mono_threadpool_worker_get_max () - counter._.working);
*completion_port_threads = threadpool.limit_io_max;
+
+ mono_refcount_dec (&threadpool);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
- *worker_threads = mono_threadpool_worker_get_min (threadpool.worker);
+ *worker_threads = mono_threadpool_worker_get_min ();
*completion_port_threads = threadpool.limit_io_min;
+
+ mono_refcount_dec (&threadpool);
}
void
if (!worker_threads || !completion_port_threads)
return;
- mono_lazy_initialize (&status, initialize);
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
+ *worker_threads = 0;
+ *completion_port_threads = 0;
+ return;
+ }
- *worker_threads = mono_threadpool_worker_get_max (threadpool.worker);
+ *worker_threads = mono_threadpool_worker_get_max ();
*completion_port_threads = threadpool.limit_io_max;
+
+ mono_refcount_dec (&threadpool);
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
- mono_lazy_initialize (&status, initialize);
-
if (completion_port_threads <= 0 || completion_port_threads > threadpool.limit_io_max)
return FALSE;
- if (!mono_threadpool_worker_set_min (threadpool.worker, worker_threads))
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool))
+ return FALSE;
+
+ if (!mono_threadpool_worker_set_min (worker_threads)) {
+ mono_refcount_dec (&threadpool);
return FALSE;
+ }
threadpool.limit_io_min = completion_port_threads;
+ mono_refcount_dec (&threadpool);
return TRUE;
}
{
gint cpu_count = mono_cpu_count ();
- mono_lazy_initialize (&status, initialize);
-
if (completion_port_threads < threadpool.limit_io_min || completion_port_threads < cpu_count)
return FALSE;
- if (!mono_threadpool_worker_set_max (threadpool.worker, worker_threads))
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool))
+ return FALSE;
+
+ if (!mono_threadpool_worker_set_max (worker_threads)) {
+ mono_refcount_dec (&threadpool);
return FALSE;
+ }
threadpool.limit_io_max = completion_port_threads;
+ mono_refcount_dec (&threadpool);
return TRUE;
}
if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
return FALSE;
- return mono_threadpool_worker_notify_completed (threadpool.worker);
+ return mono_threadpool_worker_notify_completed ();
}
void
ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
{
- mono_threadpool_worker_notify_completed (threadpool.worker);
+ mono_threadpool_worker_notify_completed ();
}
void
if (mono_domain_is_unloading (domain))
return FALSE;
- if (!mono_refcount_tryinc (&threadpool)) {
+ if (!mono_lazy_initialize (&status, initialize) || !mono_refcount_tryinc (&threadpool)) {
/* threadpool has been destroyed, we are shutting down */
return FALSE;
}
domains_lock ();
- /* synchronize with mono_threadpool_remove_domain_jobs */
- if (mono_domain_is_unloading (domain)) {
- domains_unlock ();
- mono_refcount_dec (&threadpool);
- return FALSE;
+ tpdomain = tpdomain_get (domain);
+ if (!tpdomain) {
+ /* synchronize with mono_threadpool_remove_domain_jobs */
+ if (mono_domain_is_unloading (domain)) {
+ domains_unlock ();
+ mono_refcount_dec (&threadpool);
+ return FALSE;
+ }
+
+ tpdomain = tpdomain_create (domain);
}
- tpdomain = tpdomain_get (domain, TRUE);
g_assert (tpdomain);
tpdomain->outstanding_request ++;
counter._.starting ++;
});
- mono_threadpool_worker_enqueue (threadpool.worker, worker_callback, NULL);
+ mono_threadpool_worker_request ();
+ mono_refcount_dec (&threadpool);
return TRUE;
}