#include <mono/utils/atomic.h>
#include <mono/utils/mono-compiler.h>
#include <mono/utils/mono-complex.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger.h>
+#include <mono/utils/mono-logger-internal.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-threads.h>
#include <mono/utils/mono-time.h>
#define CPU_USAGE_HIGH 95
#define MONITOR_INTERVAL 100 // ms
+#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
/* The exponent to apply to the gain. 1.0 means to use linear gain,
* higher values will enhance large moves and damp small ones.
TRANSITION_UNDEFINED,
} ThreadPoolHeuristicStateTransition;
+static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
enum {
MONITOR_STATUS_REQUESTED,
MONITOR_STATUS_WAITING_FOR_REQUEST,
MONITOR_STATUS_NOT_RUNNING,
};
-static gint32 status = STATUS_NOT_INITIALIZED;
static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
static ThreadPool* threadpool;
}
static void
-ensure_initialized (MonoBoolean *enable_worker_tracking)
+initialize (void)
{
ThreadPoolHillClimbing *hc;
const char *threads_per_cpu_env;
gint threads_per_cpu;
gint threads_count;
- if (enable_worker_tracking) {
- // TODO implement some kind of switch to have the possibily to use it
- *enable_worker_tracking = FALSE;
- }
-
- if (status >= STATUS_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- g_assert (status >= STATUS_INITIALIZED);
- return;
- }
-
g_assert (!threadpool);
threadpool = g_new0 (ThreadPool, 1);
g_assert (threadpool);
threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
threadpool->suspended = FALSE;
-
- status = STATUS_INITIALIZED;
}
static void worker_unpark (ThreadPoolParkedThread *thread);
static void worker_kill (ThreadPoolWorkingThread *thread);
static void
-ensure_cleanedup (void)
+cleanup (void)
{
guint i;
- if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- }
- if (status == STATUS_CLEANED_UP)
- return;
- if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
- while (status == STATUS_CLEANING_UP)
- mono_thread_info_yield ();
- g_assert (status == STATUS_CLEANED_UP);
- return;
- }
-
/* we make the assumption along the code that we are
* cleaning up only if the runtime is shutting down */
g_assert (mono_runtime_is_shutting_down ());
worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
mono_mutex_unlock (&threadpool->active_threads_lock);
-
- status = STATUS_CLEANED_UP;
}
void
mono_cond_t cond;
MonoInternalThread *thread = mono_thread_internal_current ();
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", GetCurrentThreadId ());
+
mono_cond_init (&cond, NULL);
mono_gc_set_skip_thread (TRUE);
mono_gc_set_skip_thread (FALSE);
mono_cond_destroy (&cond);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking", GetCurrentThreadId ());
}
static gboolean
gboolean res = FALSE;
guint len;
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", GetCurrentThreadId ());
+
mono_mutex_lock (&threadpool->active_threads_lock);
len = threadpool->parked_threads->len;
if (len > 0) {
res = TRUE;
}
mono_mutex_unlock (&threadpool->active_threads_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", GetCurrentThreadId (), res ? "yes" : "no");
+
return res;
}
static void
worker_thread (gpointer data)
{
- static MonoClass *threadpool_wait_callback_class = NULL;
- static MonoMethod *perform_wait_callback_method = NULL;
MonoInternalThread *thread;
ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
gboolean retire = FALSE;
- g_assert (status >= STATUS_INITIALIZED);
-
- if (!threadpool_wait_callback_class)
- threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "_ThreadPoolWaitCallback");
- g_assert (threadpool_wait_callback_class);
-
- if (!perform_wait_callback_method)
- perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
- g_assert (perform_wait_callback_method);
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", GetCurrentThreadId ());
g_assert (threadpool);
tpdomain->outstanding_request --;
g_assert (tpdomain->outstanding_request >= 0);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
g_assert (tpdomain->domain);
g_assert (tpdomain->domain->threadpool_jobs >= 0);
tpdomain->domain->threadpool_jobs ++;
mono_thread_push_appdomain_ref (tpdomain->domain);
if (mono_domain_set (tpdomain->domain, FALSE)) {
MonoObject *exc = NULL;
- MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
+ MonoObject *res = mono_runtime_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc);
if (exc)
mono_thread_internal_unhandled_exception (exc);
else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
counter._.working--;
counter._.active --;
});
+
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GetCurrentThreadId ());
}
static gboolean
worker_try_create (void)
{
ThreadPoolCounter counter;
+ MonoInternalThread *thread;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", GetCurrentThreadId ());
COUNTER_ATOMIC (counter, {
if (counter._.working >= counter._.max_working)
counter._.active ++;
});
- if (mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0) != NULL)
+ if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0)) != NULL) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p",
+ GetCurrentThreadId (), thread->tid);
return TRUE;
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", GetCurrentThreadId ());
COUNTER_ATOMIC (counter, {
counter._.working --;
g_assert (tpdomain);
tpdomain->outstanding_request ++;
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
mono_mutex_unlock (&threadpool->domains_lock);
if (threadpool->suspended)
monitor_ensure_running ();
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", GetCurrentThreadId ());
return TRUE;
+ }
- if (worker_try_create ())
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", GetCurrentThreadId ());
return TRUE;
+ }
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", GetCurrentThreadId ());
return FALSE;
}
static gboolean
monitor_should_keep_running (void)
{
+ static gint64 last_should_keep_running = -1;
+
g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
- if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
+ gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
+
+ if (mono_runtime_is_shutting_down ()) {
+ should_keep_running = FALSE;
+ } else {
+ if (!domain_any_has_request ())
+ should_keep_running = FALSE;
+
+ if (!should_keep_running) {
+ if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
+ should_keep_running = force_should_keep_running = TRUE;
+ }
+ }
+ }
+
+ if (should_keep_running) {
+ if (last_should_keep_running == -1 || !force_should_keep_running)
+ last_should_keep_running = mono_100ns_ticks ();
+ } else {
+ last_should_keep_running = -1;
if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
return FALSE;
}
mono_cpu_usage (threadpool->cpu_usage_state);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", GetCurrentThreadId ());
+
do {
MonoInternalThread *thread;
gboolean all_waitsleepjoin = TRUE;
if (mono_runtime_is_shutting_down ())
break;
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", GetCurrentThreadId ());
break;
+ }
- if (worker_try_create ())
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", GetCurrentThreadId ());
break;
+ }
}
}
} while (monitor_should_keep_running ());
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", GetCurrentThreadId ());
}
static void
hc = &threadpool->heuristic_hill_climbing;
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", GetCurrentThreadId (), new_thread_count);
+
hc->last_thread_count = new_thread_count;
hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
hc->elapsed_since_last_change = 0;
#ifndef DISABLE_SOCKETS
mono_threadpool_ms_io_cleanup ();
#endif
- ensure_cleanedup ();
+ mono_lazy_cleanup (&status, cleanup);
}
MonoAsyncResult *
async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
g_assert (async_call_klass);
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_min;
*completion_port_threads = threadpool->limit_io_min;
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
return FALSE;
{
gint cpu_count = mono_cpu_count ();
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
return FALSE;
void
ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
{
- ensure_initialized (enable_worker_tracking);
+ if (enable_worker_tracking) {
+ // TODO implement some kind of switch to have the possibily to use it
+ *enable_worker_tracking = FALSE;
+ }
+
+ mono_lazy_initialize (&status, initialize);
}
MonoBoolean