// Ported from C++ to C and adjusted to Mono runtime
#include <stdlib.h>
+#define _USE_MATH_DEFINES // needed by MSVC to define math constants
#include <math.h>
#include <config.h>
#include <glib.h>
-#if !defined (HAVE_COMPLEX_H)
-#include <../../support/libm/complex.h>
-#else
-#include <complex.h>
-#endif
-
#include <mono/metadata/class-internals.h>
#include <mono/metadata/exception.h>
#include <mono/metadata/gc-internal.h>
#include <mono/metadata/object-internals.h>
#include <mono/metadata/threadpool-ms.h>
#include <mono/metadata/threadpool-ms-io.h>
-#include <mono/metadata/threadpool-internals.h>
#include <mono/utils/atomic.h>
#include <mono/utils/mono-compiler.h>
+#include <mono/utils/mono-complex.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger.h>
+#include <mono/utils/mono-logger-internal.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-threads.h>
#include <mono/utils/mono-time.h>
#define CPU_USAGE_HIGH 95
#define MONITOR_INTERVAL 100 // ms
+#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
/* The exponent to apply to the gain. 1.0 means to use linear gain,
* higher values will enhance large moves and damp small ones.
TRANSITION_UNDEFINED,
} ThreadPoolHeuristicStateTransition;
+static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
enum {
MONITOR_STATUS_REQUESTED,
MONITOR_STATUS_WAITING_FOR_REQUEST,
MONITOR_STATUS_NOT_RUNNING,
};
-static gint32 status = STATUS_NOT_INITIALIZED;
static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
static ThreadPool* threadpool;
}
static void
-ensure_initialized (MonoBoolean *enable_worker_tracking)
+initialize (void)
{
ThreadPoolHillClimbing *hc;
const char *threads_per_cpu_env;
gint threads_per_cpu;
gint threads_count;
- if (enable_worker_tracking) {
- // TODO implement some kind of switch to have the possibily to use it
- *enable_worker_tracking = FALSE;
- }
-
- if (status >= STATUS_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- g_assert (status >= STATUS_INITIALIZED);
- return;
- }
-
g_assert (!threadpool);
threadpool = g_new0 (ThreadPool, 1);
g_assert (threadpool);
threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
threadpool->suspended = FALSE;
-
- status = STATUS_INITIALIZED;
}
static void worker_unpark (ThreadPoolParkedThread *thread);
static void worker_kill (ThreadPoolWorkingThread *thread);
static void
-ensure_cleanedup (void)
+cleanup (void)
{
guint i;
- if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- }
- if (status == STATUS_CLEANED_UP)
- return;
- if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
- while (status == STATUS_CLEANING_UP)
- mono_thread_info_yield ();
- g_assert (status == STATUS_CLEANED_UP);
- return;
- }
-
/* we make the assumption along the code that we are
* cleaning up only if the runtime is shutting down */
g_assert (mono_runtime_is_shutting_down ());
while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
- usleep (1000);
+ g_usleep (1000);
mono_mutex_lock (&threadpool->active_threads_lock);
worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
mono_mutex_unlock (&threadpool->active_threads_lock);
-
- status = STATUS_CLEANED_UP;
}
void
mono_cond_t cond;
MonoInternalThread *thread = mono_thread_internal_current ();
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", GetCurrentThreadId ());
+
mono_cond_init (&cond, NULL);
mono_gc_set_skip_thread (TRUE);
mono_gc_set_skip_thread (FALSE);
mono_cond_destroy (&cond);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking", GetCurrentThreadId ());
}
static gboolean
gboolean res = FALSE;
guint len;
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", GetCurrentThreadId ());
+
mono_mutex_lock (&threadpool->active_threads_lock);
len = threadpool->parked_threads->len;
if (len > 0) {
res = TRUE;
}
mono_mutex_unlock (&threadpool->active_threads_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", GetCurrentThreadId (), res ? "yes" : "no");
+
return res;
}
static void
worker_thread (gpointer data)
{
- static MonoClass *threadpool_wait_callback_class = NULL;
- static MonoMethod *perform_wait_callback_method = NULL;
MonoInternalThread *thread;
ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
gboolean retire = FALSE;
- g_assert (status >= STATUS_INITIALIZED);
-
- if (!threadpool_wait_callback_class)
- threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
- g_assert (threadpool_wait_callback_class);
-
- if (!perform_wait_callback_method)
- perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
- g_assert (perform_wait_callback_method);
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", GetCurrentThreadId ());
g_assert (threadpool);
tpdomain->outstanding_request --;
g_assert (tpdomain->outstanding_request >= 0);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
g_assert (tpdomain->domain);
g_assert (tpdomain->domain->threadpool_jobs >= 0);
tpdomain->domain->threadpool_jobs ++;
mono_thread_push_appdomain_ref (tpdomain->domain);
if (mono_domain_set (tpdomain->domain, FALSE)) {
MonoObject *exc = NULL;
- MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
+ MonoObject *res = mono_runtime_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc);
if (exc)
- mono_internal_thread_unhandled_exception (exc);
+ mono_thread_internal_unhandled_exception (exc);
else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
retire = TRUE;
counter._.working--;
counter._.active --;
});
+
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GetCurrentThreadId ());
}
static gboolean
worker_try_create (void)
{
ThreadPoolCounter counter;
+ MonoInternalThread *thread;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", GetCurrentThreadId ());
COUNTER_ATOMIC (counter, {
if (counter._.working >= counter._.max_working)
counter._.active ++;
});
- if (mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0) != NULL)
+ if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0)) != NULL) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p",
+ GetCurrentThreadId (), thread->tid);
return TRUE;
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", GetCurrentThreadId ());
COUNTER_ATOMIC (counter, {
counter._.working --;
g_assert (tpdomain);
tpdomain->outstanding_request ++;
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
mono_mutex_unlock (&threadpool->domains_lock);
if (threadpool->suspended)
monitor_ensure_running ();
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", GetCurrentThreadId ());
return TRUE;
+ }
- if (worker_try_create ())
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", GetCurrentThreadId ());
return TRUE;
+ }
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", GetCurrentThreadId ());
return FALSE;
}
static gboolean
monitor_should_keep_running (void)
{
+ static gint64 last_should_keep_running = -1;
+
g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
- if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
+ gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
+
+ if (mono_runtime_is_shutting_down ()) {
+ should_keep_running = FALSE;
+ } else {
+ if (!domain_any_has_request ())
+ should_keep_running = FALSE;
+
+ if (!should_keep_running) {
+ if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
+ should_keep_running = force_should_keep_running = TRUE;
+ }
+ }
+ }
+
+ if (should_keep_running) {
+ if (last_should_keep_running == -1 || !force_should_keep_running)
+ last_should_keep_running = mono_100ns_ticks ();
+ } else {
+ last_should_keep_running = -1;
if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
return FALSE;
}
mono_cpu_usage (threadpool->cpu_usage_state);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", GetCurrentThreadId ());
+
do {
MonoInternalThread *thread;
gboolean all_waitsleepjoin = TRUE;
if (mono_runtime_is_shutting_down ())
break;
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", GetCurrentThreadId ());
break;
+ }
- if (worker_try_create ())
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", GetCurrentThreadId ());
break;
+ }
}
}
} while (monitor_should_keep_running ());
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", GetCurrentThreadId ());
}
static void
hc = &threadpool->heuristic_hill_climbing;
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", GetCurrentThreadId (), new_thread_count);
+
hc->last_thread_count = new_thread_count;
hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
hc->elapsed_since_last_change = 0;
}
}
-static double complex
+static double_complex
hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
{
ThreadPoolHillClimbing *hc;
q1 = q0;
}
- return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
+ return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
}
static gint16
gint sample_count;
gint new_thread_wave_magnitude;
gint new_thread_count;
- double complex thread_wave_component;
- double complex throughput_wave_component;
- double complex ratio;
+ double_complex thread_wave_component;
+ double_complex throughput_wave_component;
+ double_complex ratio;
g_assert (threadpool);
g_assert (adjustment_interval);
hc->total_samples ++;
/* Set up defaults for our metrics. */
- thread_wave_component = 0;
- throughput_wave_component = 0;
+ thread_wave_component = mono_double_complex_make(0, 0);
+ throughput_wave_component = mono_double_complex_make(0, 0);
throughput_error_estimate = 0;
- ratio = 0;
+ ratio = mono_double_complex_make(0, 0);
confidence = 0;
transition = TRANSITION_WARMUP;
/* Get the the three different frequency components of the throughput (scaled by average
* throughput). Our "error" estimate (the amount of noise that might be present in the
* frequency band we're really interested in) is the average of the adjacent bands. */
- throughput_wave_component = hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
- throughput_error_estimate = cabs (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
+ throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
+ throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
if (adjacent_period_2 <= sample_count) {
- throughput_error_estimate = MAX (throughput_error_estimate, cabs (hill_climbing_get_wave_component (
- hc->samples, sample_count, adjacent_period_2) / average_throughput));
+ throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
+ hc->samples, sample_count, adjacent_period_2), average_throughput)));
}
/* Do the same for the thread counts, so we have something to compare to. We don't
* measure thread count noise, because there is none; these are exact measurements. */
- thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
+ thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
/* Update our moving average of the throughput noise. We'll use this
* later as feedback to determine the new size of the thread wave. */
if (cabs (thread_wave_component) > 0) {
/* Adjust the throughput wave so it's centered around the target wave,
* and then calculate the adjusted throughput/thread ratio. */
- ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
+ ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
transition = TRANSITION_CLIMBING_MOVE;
} else {
- ratio = 0;
+ ratio = mono_double_complex_make (0, 0);
transition = TRANSITION_STABILIZING;
}
#ifndef DISABLE_SOCKETS
mono_threadpool_ms_io_cleanup ();
#endif
- ensure_cleanedup ();
+ mono_lazy_cleanup (&status, cleanup);
}
MonoAsyncResult *
async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
g_assert (async_call_klass);
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
void
mono_threadpool_ms_suspend (void)
{
- threadpool->suspended = TRUE;
+ if (threadpool)
+ threadpool->suspended = TRUE;
}
void
mono_threadpool_ms_resume (void)
{
- threadpool->suspended = FALSE;
+ if (threadpool)
+ threadpool->suspended = FALSE;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_min;
*completion_port_threads = threadpool->limit_io_min;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
+ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
return FALSE;
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
+ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
gint cpu_count = mono_cpu_count ();
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
return FALSE;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
+ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
{
- ensure_initialized (enable_worker_tracking);
+ if (enable_worker_tracking) {
+ // TODO implement some kind of switch to have the possibily to use it
+ *enable_worker_tracking = FALSE;
+ }
+
+ mono_lazy_initialize (&status, initialize);
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
{
ThreadPoolCounter counter;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
{
heuristic_notify_work_completed ();
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
+ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
{
// TODO
mono_raise_exception (mono_get_exception_not_implemented (NULL));
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
+ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
{
return worker_request (mono_domain_get ());
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
+ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
{
/* This copy the behavior of the current Mono implementation */
mono_raise_exception (mono_get_exception_not_implemented (NULL));
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
+ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
{
/* This copy the behavior of the current Mono implementation */
return TRUE;
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_IsThreadPoolHosted (void)
+ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
{
return FALSE;
}