// Ported from C++ to C and adjusted to Mono runtime
#include <stdlib.h>
+#define _USE_MATH_DEFINES // needed by MSVC to define math constants
#include <math.h>
#include <config.h>
#include <glib.h>
-#if !defined (HAVE_COMPLEX_H)
-#include <../../support/libm/complex.h>
-#else
-#include <complex.h>
-#endif
-
#include <mono/metadata/class-internals.h>
#include <mono/metadata/exception.h>
#include <mono/metadata/gc-internal.h>
#include <mono/metadata/object-internals.h>
#include <mono/metadata/threadpool-ms.h>
#include <mono/metadata/threadpool-ms-io.h>
-#include <mono/metadata/threadpool-internals.h>
#include <mono/utils/atomic.h>
#include <mono/utils/mono-compiler.h>
+#include <mono/utils/mono-complex.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger.h>
+#include <mono/utils/mono-logger-internal.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-threads.h>
#include <mono/utils/mono-time.h>
#define CPU_USAGE_HIGH 95
#define MONITOR_INTERVAL 100 // ms
+#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
/* The exponent to apply to the gain. 1.0 means to use linear gain,
* higher values will enhance large moves and damp small ones.
typedef union {
struct {
gint16 max_working; /* determined by heuristic */
- gint16 active; /* working or waiting on thread_work_sem; warm threads */
- gint16 working;
- gint16 parked;
+ gint16 active; /* executing worker_thread */
+ gint16 working; /* actively executing worker_thread, not parked */
+ gint16 parked; /* parked */
} _;
gint64 as_gint64;
} ThreadPoolCounter;
gint32 outstanding_request;
} ThreadPoolDomain;
+typedef MonoInternalThread ThreadPoolWorkingThread;
+typedef mono_cond_t ThreadPoolParkedThread;
+
typedef struct {
gint32 wave_period;
gint32 samples_to_measure;
GPtrArray *domains; // ThreadPoolDomain* []
mono_mutex_t domains_lock;
- GPtrArray *working_threads; // MonoInternalThread* []
- mono_mutex_t working_threads_lock;
-
- GPtrArray *parked_threads; // mono_cond_t* []
- mono_mutex_t parked_threads_lock;
+ GPtrArray *working_threads; // ThreadPoolWorkingThread* []
+ GPtrArray *parked_threads; // ThreadPoolParkedThread* []
+ mono_mutex_t active_threads_lock; /* protect access to working_threads and parked_threads */
gint32 heuristic_completions;
guint32 heuristic_sample_start;
TRANSITION_UNDEFINED,
} ThreadPoolHeuristicStateTransition;
+static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
enum {
MONITOR_STATUS_REQUESTED,
MONITOR_STATUS_WAITING_FOR_REQUEST,
MONITOR_STATUS_NOT_RUNNING,
};
-static gint32 status = STATUS_NOT_INITIALIZED;
static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
static ThreadPool* threadpool;
#define COUNTER_CHECK(counter) \
do { \
g_assert (counter._.max_working > 0); \
+ g_assert (counter._.working >= 0); \
g_assert (counter._.active >= 0); \
} while (0)
-#define COUNTER_READ() ((ThreadPoolCounter) InterlockedRead64 (&threadpool->counters.as_gint64))
+#define COUNTER_READ() (InterlockedRead64 (&threadpool->counters.as_gint64))
#define COUNTER_ATOMIC(var,block) \
do { \
ThreadPoolCounter __old; \
do { \
g_assert (threadpool); \
- (var) = __old = COUNTER_READ (); \
+ __old.as_gint64 = COUNTER_READ (); \
+ (var) = __old; \
{ block; } \
COUNTER_CHECK (var); \
} while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
ThreadPoolCounter __old; \
do { \
g_assert (threadpool); \
- (var) = __old = COUNTER_READ (); \
+ __old.as_gint64 = COUNTER_READ (); \
+ (var) = __old; \
(res) = FALSE; \
{ block; } \
COUNTER_CHECK (var); \
}
static void
-ensure_initialized (MonoBoolean *enable_worker_tracking)
+initialize (void)
{
ThreadPoolHillClimbing *hc;
const char *threads_per_cpu_env;
gint threads_per_cpu;
gint threads_count;
- if (enable_worker_tracking) {
- // TODO implement some kind of switch to have the possibily to use it
- *enable_worker_tracking = FALSE;
- }
-
- if (status >= STATUS_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- g_assert (status >= STATUS_INITIALIZED);
- return;
- }
-
g_assert (!threadpool);
threadpool = g_new0 (ThreadPool, 1);
g_assert (threadpool);
mono_mutex_init_recursive (&threadpool->domains_lock);
threadpool->parked_threads = g_ptr_array_new ();
- mono_mutex_init (&threadpool->parked_threads_lock);
-
threadpool->working_threads = g_ptr_array_new ();
- mono_mutex_init (&threadpool->working_threads_lock);
+ mono_mutex_init (&threadpool->active_threads_lock);
threadpool->heuristic_adjustment_interval = 10;
mono_mutex_init (&threadpool->heuristic_lock);
threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
threadpool->suspended = FALSE;
-
- status = STATUS_INITIALIZED;
}
+static void worker_unpark (ThreadPoolParkedThread *thread);
+static void worker_kill (ThreadPoolWorkingThread *thread);
+
static void
-ensure_cleanedup (void)
+cleanup (void)
{
- if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
- return;
- if (status == STATUS_INITIALIZING) {
- while (status == STATUS_INITIALIZING)
- mono_thread_info_yield ();
- }
- if (status == STATUS_CLEANED_UP)
- return;
- if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
- while (status == STATUS_CLEANING_UP)
- mono_thread_info_yield ();
- g_assert (status == STATUS_CLEANED_UP);
- return;
- }
+ guint i;
/* we make the assumption along the code that we are
* cleaning up only if the runtime is shutting down */
g_assert (mono_runtime_is_shutting_down ());
- /* Unpark all worker threads */
- mono_mutex_lock (&threadpool->parked_threads_lock);
- for (;;) {
- guint i;
- ThreadPoolCounter counter = COUNTER_READ ();
- if (counter._.active == 0 && counter._.parked == 0)
- break;
- if (counter._.active == 1) {
- MonoInternalThread *thread = mono_thread_internal_current ();
- if (thread->threadpool_thread) {
- /* if there is only one active thread
- * left and it's the current one */
- break;
- }
- }
- for (i = 0; i < threadpool->parked_threads->len; ++i) {
- mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, i);
- mono_cond_signal (cond);
- }
- mono_mutex_unlock (&threadpool->parked_threads_lock);
- usleep (1000);
- mono_mutex_lock (&threadpool->parked_threads_lock);
- }
- mono_mutex_unlock (&threadpool->parked_threads_lock);
-
while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
- usleep (1000);
-
- g_ptr_array_free (threadpool->domains, TRUE);
- mono_mutex_destroy (&threadpool->domains_lock);
+ g_usleep (1000);
- g_ptr_array_free (threadpool->parked_threads, TRUE);
- mono_mutex_destroy (&threadpool->parked_threads_lock);
+ mono_mutex_lock (&threadpool->active_threads_lock);
- g_ptr_array_free (threadpool->working_threads, TRUE);
- mono_mutex_destroy (&threadpool->working_threads_lock);
+ /* stop all threadpool->working_threads */
+ for (i = 0; i < threadpool->working_threads->len; ++i)
+ worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
- mono_mutex_destroy (&threadpool->heuristic_lock);
- g_free (threadpool->heuristic_hill_climbing.samples);
- g_free (threadpool->heuristic_hill_climbing.thread_counts);
- rand_free (threadpool->heuristic_hill_climbing.random_interval_generator);
+ /* unpark all threadpool->parked_threads */
+ for (i = 0; i < threadpool->parked_threads->len; ++i)
+ worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
- g_free (threadpool->cpu_usage_state);
-
- g_assert (threadpool);
- g_free (threadpool);
- threadpool = NULL;
- g_assert (!threadpool);
-
- status = STATUS_CLEANED_UP;
+ mono_mutex_unlock (&threadpool->active_threads_lock);
}
void
}
static ThreadPoolDomain *
-domain_get_or_create (MonoDomain *domain)
+domain_get (MonoDomain *domain, gboolean create)
{
ThreadPoolDomain *tpdomain = NULL;
guint i;
break;
}
}
- if (!tpdomain) {
+ if (!tpdomain && create) {
tpdomain = g_new0 (ThreadPoolDomain, 1);
tpdomain->domain = domain;
domain_add (tpdomain);
return tpdomain;
}
+static void
+domain_free (ThreadPoolDomain *tpdomain)
+{
+ g_free (tpdomain);
+}
+
static gboolean
domain_any_has_request (void)
{
ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
if (tmp->outstanding_request > 0) {
tpdomain = tmp;
- tpdomain->outstanding_request --;
- g_assert (tpdomain->outstanding_request >= 0);
break;
}
}
worker_park (void)
{
mono_cond_t cond;
+ MonoInternalThread *thread = mono_thread_internal_current ();
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", GetCurrentThreadId ());
+
mono_cond_init (&cond, NULL);
- mono_mutex_lock (&threadpool->parked_threads_lock);
- g_ptr_array_add (threadpool->parked_threads, &cond);
- mono_cond_wait (&cond, &threadpool->parked_threads_lock);
- g_ptr_array_remove (threadpool->parked_threads, &cond);
- mono_mutex_unlock (&threadpool->parked_threads_lock);
+ mono_gc_set_skip_thread (TRUE);
+
+ mono_mutex_lock (&threadpool->active_threads_lock);
+
+ if (!mono_runtime_is_shutting_down ()) {
+ g_ptr_array_add (threadpool->parked_threads, &cond);
+ g_ptr_array_remove_fast (threadpool->working_threads, thread);
+
+ mono_cond_wait (&cond, &threadpool->active_threads_lock);
+
+ g_ptr_array_add (threadpool->working_threads, thread);
+ g_ptr_array_remove (threadpool->parked_threads, &cond);
+ }
+
+ mono_mutex_unlock (&threadpool->active_threads_lock);
+
+ mono_gc_set_skip_thread (FALSE);
mono_cond_destroy (&cond);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking", GetCurrentThreadId ());
}
static gboolean
gboolean res = FALSE;
guint len;
- mono_mutex_lock (&threadpool->parked_threads_lock);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", GetCurrentThreadId ());
+
+ mono_mutex_lock (&threadpool->active_threads_lock);
len = threadpool->parked_threads->len;
if (len > 0) {
mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
mono_cond_signal (cond);
res = TRUE;
}
- mono_mutex_unlock (&threadpool->parked_threads_lock);
+ mono_mutex_unlock (&threadpool->active_threads_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", GetCurrentThreadId (), res ? "yes" : "no");
+
return res;
}
+static void
+worker_unpark (ThreadPoolParkedThread *thread)
+{
+ mono_cond_signal ((mono_cond_t*) thread);
+}
+
+static void
+worker_kill (ThreadPoolWorkingThread *thread)
+{
+ if (thread == mono_thread_internal_current ())
+ return;
+
+ mono_thread_internal_stop ((MonoInternalThread*) thread);
+}
+
static void
worker_thread (gpointer data)
{
- static MonoClass *threadpool_wait_callback_class = NULL;
- static MonoMethod *perform_wait_callback_method = NULL;
MonoInternalThread *thread;
- ThreadPoolDomain *tpdomain;
+ ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
gboolean retire = FALSE;
- g_assert (status >= STATUS_INITIALIZED);
-
- tpdomain = data;
- g_assert (tpdomain);
- g_assert (tpdomain->domain);
-
- if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (tpdomain->domain)) {
- COUNTER_ATOMIC (counter, { counter._.active --; });
- return;
- }
-
- if (!threadpool_wait_callback_class)
- threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
- g_assert (threadpool_wait_callback_class);
-
- if (!perform_wait_callback_method)
- perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
- g_assert (perform_wait_callback_method);
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", GetCurrentThreadId ());
g_assert (threadpool);
thread = mono_thread_internal_current ();
g_assert (thread);
+ mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"), FALSE);
+
+ mono_mutex_lock (&threadpool->active_threads_lock);
+ g_ptr_array_add (threadpool->working_threads, thread);
+ mono_mutex_unlock (&threadpool->active_threads_lock);
+
+ previous_tpdomain = NULL;
+
mono_mutex_lock (&threadpool->domains_lock);
- do {
- guint i, c;
+ while (!mono_runtime_is_shutting_down ()) {
+ tpdomain = NULL;
- g_assert (tpdomain);
- g_assert (tpdomain->domain);
+ if ((thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) {
+ mono_mutex_unlock (&threadpool->domains_lock);
+ mono_thread_interruption_checkpoint ();
+ mono_mutex_lock (&threadpool->domains_lock);
+ }
- tpdomain->domain->threadpool_jobs ++;
+ if (retire || !(tpdomain = domain_get_next (previous_tpdomain))) {
+ COUNTER_ATOMIC (counter, {
+ counter._.working --;
+ counter._.parked ++;
+ });
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_mutex_unlock (&threadpool->domains_lock);
+ worker_park ();
+ mono_mutex_lock (&threadpool->domains_lock);
- mono_mutex_lock (&threadpool->working_threads_lock);
- g_ptr_array_add (threadpool->working_threads, thread);
- mono_mutex_unlock (&threadpool->working_threads_lock);
+ COUNTER_ATOMIC (counter, {
+ counter._.working ++;
+ counter._.parked --;
+ });
+
+ if (retire)
+ retire = FALSE;
+
+ continue;
+ }
- COUNTER_ATOMIC (counter, { counter._.working ++; });
+ tpdomain->outstanding_request --;
+ g_assert (tpdomain->outstanding_request >= 0);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
+ g_assert (tpdomain->domain);
+ g_assert (tpdomain->domain->threadpool_jobs >= 0);
+ tpdomain->domain->threadpool_jobs ++;
+
+ mono_mutex_unlock (&threadpool->domains_lock);
mono_thread_push_appdomain_ref (tpdomain->domain);
if (mono_domain_set (tpdomain->domain, FALSE)) {
MonoObject *exc = NULL;
- MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
+ MonoObject *res = mono_runtime_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc);
if (exc)
- mono_internal_thread_unhandled_exception (exc);
+ mono_thread_internal_unhandled_exception (exc);
else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
retire = TRUE;
mono_thread_clr_state (thread , ~ThreadState_Background);
if (!mono_thread_test_state (thread , ThreadState_Background))
ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+
+ mono_domain_set (mono_get_root_domain (), TRUE);
}
mono_thread_pop_appdomain_ref ();
- COUNTER_ATOMIC (counter, { counter._.working --; });
-
- mono_mutex_lock (&threadpool->working_threads_lock);
- g_ptr_array_remove_fast (threadpool->working_threads, thread);
- mono_mutex_unlock (&threadpool->working_threads_lock);
-
mono_mutex_lock (&threadpool->domains_lock);
tpdomain->domain->threadpool_jobs --;
g_assert (removed);
if (tpdomain->domain->cleanup_semaphore)
ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
- g_free (tpdomain);
+ domain_free (tpdomain);
tpdomain = NULL;
}
- for (i = 0, c = 5; i < c; ++i) {
- if (mono_runtime_is_shutting_down ())
- break;
-
- if (!retire) {
- tpdomain = domain_get_next (tpdomain);
- if (tpdomain)
- break;
- }
+ previous_tpdomain = tpdomain;
+ }
- if (i < c - 1) {
- gboolean park = TRUE;
-
- COUNTER_ATOMIC (counter, {
- if (counter._.active <= counter._.max_working) {
- park = FALSE;
- break;
- }
- counter._.active --;
- counter._.parked ++;
- });
-
- if (park) {
- mono_mutex_unlock (&threadpool->domains_lock);
- mono_gc_set_skip_thread (TRUE);
- worker_park ();
- mono_gc_set_skip_thread (FALSE);
- mono_mutex_lock (&threadpool->domains_lock);
-
- COUNTER_ATOMIC (counter, {
- counter._.active ++;
- counter._.parked --;
- });
- }
- }
+ mono_mutex_unlock (&threadpool->domains_lock);
- retire = FALSE;
- }
- } while (tpdomain && !mono_runtime_is_shutting_down ());
+ mono_mutex_lock (&threadpool->active_threads_lock);
+ g_ptr_array_remove_fast (threadpool->working_threads, thread);
+ mono_mutex_unlock (&threadpool->active_threads_lock);
- mono_mutex_unlock (&threadpool->domains_lock);
+ COUNTER_ATOMIC (counter, {
+ counter._.working--;
+ counter._.active --;
+ });
- COUNTER_ATOMIC (counter, { counter._.active --; });
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GetCurrentThreadId ());
}
static gboolean
-worker_try_create (ThreadPoolDomain *tpdomain)
+worker_try_create (void)
{
- g_assert (tpdomain);
- g_assert (tpdomain->domain);
+ ThreadPoolCounter counter;
+ MonoInternalThread *thread;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", GetCurrentThreadId ());
+
+ COUNTER_ATOMIC (counter, {
+ if (counter._.working >= counter._.max_working)
+ return FALSE;
+ counter._.working ++;
+ counter._.active ++;
+ });
- return mono_thread_create_internal (tpdomain->domain, worker_thread, tpdomain, TRUE, 0) != NULL;
+ if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0)) != NULL) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p",
+ GetCurrentThreadId (), thread->tid);
+ return TRUE;
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", GetCurrentThreadId ());
+
+ COUNTER_ATOMIC (counter, {
+ counter._.working --;
+ counter._.active --;
+ });
+
+ return FALSE;
}
static void monitor_ensure_running (void);
worker_request (MonoDomain *domain)
{
ThreadPoolDomain *tpdomain;
- ThreadPoolCounter counter;
g_assert (domain);
g_assert (threadpool);
- if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (domain))
+ if (mono_runtime_is_shutting_down ())
return FALSE;
mono_mutex_lock (&threadpool->domains_lock);
- tpdomain = domain_get_or_create (domain);
+
+ /* synchronize check with worker_thread */
+ if (mono_domain_is_unloading (domain)) {
+ mono_mutex_unlock (&threadpool->domains_lock);
+ return FALSE;
+ }
+
+ tpdomain = domain_get (domain, TRUE);
g_assert (tpdomain);
tpdomain->outstanding_request ++;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
+ GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+
mono_mutex_unlock (&threadpool->domains_lock);
if (threadpool->suspended)
monitor_ensure_running ();
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", GetCurrentThreadId ());
return TRUE;
+ }
- COUNTER_ATOMIC (counter, {
- if (counter._.active >= counter._.max_working)
- return FALSE;
- counter._.active ++;
- });
-
- if (worker_try_create (tpdomain))
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", GetCurrentThreadId ());
return TRUE;
+ }
- COUNTER_ATOMIC (counter, { counter._.active --; });
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", GetCurrentThreadId ());
return FALSE;
}
static gboolean
monitor_should_keep_running (void)
{
+ static gint64 last_should_keep_running = -1;
+
g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
- if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
+ gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
+
+ if (mono_runtime_is_shutting_down ()) {
+ should_keep_running = FALSE;
+ } else {
+ if (!domain_any_has_request ())
+ should_keep_running = FALSE;
+
+ if (!should_keep_running) {
+ if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
+ should_keep_running = force_should_keep_running = TRUE;
+ }
+ }
+ }
+
+ if (should_keep_running) {
+ if (last_should_keep_running == -1 || !force_should_keep_running)
+ last_should_keep_running = mono_100ns_ticks ();
+ } else {
+ last_should_keep_running = -1;
if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
return FALSE;
}
if (threadpool->cpu_usage < CPU_USAGE_LOW) {
threshold = MONITOR_INTERVAL;
} else {
- ThreadPoolCounter counter = COUNTER_READ ();
+ ThreadPoolCounter counter;
+ counter.as_gint64 = COUNTER_READ();
threshold = counter._.max_working * MONITOR_INTERVAL * 2;
}
mono_cpu_usage (threadpool->cpu_usage_state);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", GetCurrentThreadId ());
+
do {
MonoInternalThread *thread;
gboolean all_waitsleepjoin = TRUE;
break;
interval_left -= mono_msec_ticks () - ts;
+ mono_gc_set_skip_thread (FALSE);
if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
mono_thread_interruption_checkpoint ();
+ mono_gc_set_skip_thread (TRUE);
} while (interval_left > 0 && ++awake < 10);
mono_gc_set_skip_thread (FALSE);
if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
continue;
- mono_mutex_lock (&threadpool->working_threads_lock);
+ mono_mutex_lock (&threadpool->active_threads_lock);
for (i = 0; i < threadpool->working_threads->len; ++i) {
thread = g_ptr_array_index (threadpool->working_threads, i);
if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
break;
}
}
- mono_mutex_unlock (&threadpool->working_threads_lock);
+ mono_mutex_unlock (&threadpool->active_threads_lock);
if (all_waitsleepjoin) {
ThreadPoolCounter counter;
if (monitor_sufficient_delay_since_last_dequeue ()) {
for (i = 0; i < 5; ++i) {
- ThreadPoolDomain *tpdomain;
- ThreadPoolCounter counter;
- gboolean success;
-
if (mono_runtime_is_shutting_down ())
break;
- if (worker_try_unpark ())
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", GetCurrentThreadId ());
break;
+ }
- COUNTER_TRY_ATOMIC (success, counter, {
- if (counter._.active >= counter._.max_working)
- break;
- counter._.active ++;
- });
-
- if (!success)
- continue;
-
- tpdomain = domain_get_next (NULL);
- if (tpdomain && worker_try_create (tpdomain))
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", GetCurrentThreadId ());
break;
-
- COUNTER_ATOMIC (counter, { counter._.active --; });
+ }
}
}
} while (monitor_should_keep_running ());
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", GetCurrentThreadId ());
}
static void
InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
break;
case MONITOR_STATUS_NOT_RUNNING:
+ if (mono_runtime_is_shutting_down ())
+ return;
if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK))
monitor_status = MONITOR_STATUS_NOT_RUNNING;
hc = &threadpool->heuristic_hill_climbing;
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", GetCurrentThreadId (), new_thread_count);
+
hc->last_thread_count = new_thread_count;
hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
hc->elapsed_since_last_change = 0;
}
}
-static double complex
+static double_complex
hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
{
ThreadPoolHillClimbing *hc;
q1 = q0;
}
- return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
+ return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
}
static gint16
gint sample_count;
gint new_thread_wave_magnitude;
gint new_thread_count;
- double complex thread_wave_component;
- double complex throughput_wave_component;
- double complex ratio;
+ double_complex thread_wave_component;
+ double_complex throughput_wave_component;
+ double_complex ratio;
g_assert (threadpool);
g_assert (adjustment_interval);
hc->total_samples ++;
/* Set up defaults for our metrics. */
- thread_wave_component = 0;
- throughput_wave_component = 0;
+ thread_wave_component = mono_double_complex_make(0, 0);
+ throughput_wave_component = mono_double_complex_make(0, 0);
throughput_error_estimate = 0;
- ratio = 0;
+ ratio = mono_double_complex_make(0, 0);
confidence = 0;
transition = TRANSITION_WARMUP;
/* Get the the three different frequency components of the throughput (scaled by average
* throughput). Our "error" estimate (the amount of noise that might be present in the
* frequency band we're really interested in) is the average of the adjacent bands. */
- throughput_wave_component = hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
- throughput_error_estimate = cabs (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
+ throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
+ throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
if (adjacent_period_2 <= sample_count) {
- throughput_error_estimate = MAX (throughput_error_estimate, cabs (hill_climbing_get_wave_component (
- hc->samples, sample_count, adjacent_period_2) / average_throughput));
+ throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
+ hc->samples, sample_count, adjacent_period_2), average_throughput)));
}
/* Do the same for the thread counts, so we have something to compare to. We don't
* measure thread count noise, because there is none; these are exact measurements. */
- thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
+ thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
/* Update our moving average of the throughput noise. We'll use this
* later as feedback to determine the new size of the thread wave. */
if (cabs (thread_wave_component) > 0) {
/* Adjust the throughput wave so it's centered around the target wave,
* and then calculate the adjusted throughput/thread ratio. */
- ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
+ ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
transition = TRANSITION_CLIMBING_MOVE;
} else {
- ratio = 0;
+ ratio = mono_double_complex_make (0, 0);
transition = TRANSITION_STABILIZING;
}
g_assert (threadpool);
if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
- ThreadPoolCounter counter = COUNTER_READ ();
- if (counter._.active <= counter._.max_working)
+ ThreadPoolCounter counter;
+ counter.as_gint64 = COUNTER_READ();
+ if (counter._.working <= counter._.max_working)
return TRUE;
}
ThreadPoolCounter counter;
gint16 new_thread_count;
- counter = COUNTER_READ ();
+ counter.as_gint64 = COUNTER_READ ();
new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
void
mono_threadpool_ms_cleanup (void)
{
-#ifndef DISABLE_SOCKETS
- mono_threadpool_ms_io_cleanup ();
-#endif
- ensure_cleanedup ();
+ #ifndef DISABLE_SOCKETS
+ mono_threadpool_ms_io_cleanup ();
+ #endif
+ mono_lazy_cleanup (&status, cleanup);
}
MonoAsyncResult *
async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
g_assert (async_call_klass);
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
g_assert (domain);
g_assert (timeout >= -1);
+ g_assert (mono_domain_is_unloading (domain));
+
if (timeout != -1)
start = mono_msec_ticks ();
return FALSE;
}
#endif
+
/*
* There might be some threads out that could be about to execute stuff from the given domain.
* We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
void
mono_threadpool_ms_suspend (void)
{
- threadpool->suspended = TRUE;
+ if (threadpool)
+ threadpool->suspended = TRUE;
}
void
mono_threadpool_ms_resume (void)
{
- threadpool->suspended = FALSE;
+ if (threadpool)
+ threadpool->suspended = FALSE;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint *worker_threads, gint *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint *worker_threads, gint *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_min;
*completion_port_threads = threadpool->limit_io_min;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint *worker_threads, gint *completion_port_threads)
+ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
if (!worker_threads || !completion_port_threads)
return;
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
*worker_threads = threadpool->limit_worker_max;
*completion_port_threads = threadpool->limit_io_max;
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint worker_threads, gint completion_port_threads)
+ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
return FALSE;
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint worker_threads, gint completion_port_threads)
+ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
{
gint cpu_count = mono_cpu_count ();
- ensure_initialized (NULL);
+ mono_lazy_initialize (&status, initialize);
if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
return FALSE;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
+ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
{
- ensure_initialized (enable_worker_tracking);
+ if (enable_worker_tracking) {
+ // TODO implement some kind of switch to have the possibily to use it
+ *enable_worker_tracking = FALSE;
+ }
+
+ mono_lazy_initialize (&status, initialize);
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
{
ThreadPoolCounter counter;
if (heuristic_should_adjust ())
heuristic_adjust ();
- counter = COUNTER_READ ();
- return counter._.active <= counter._.max_working;
+ counter.as_gint64 = COUNTER_READ ();
+ return counter._.working <= counter._.max_working;
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
{
heuristic_notify_work_completed ();
}
void
-ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
+ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
{
// TODO
mono_raise_exception (mono_get_exception_not_implemented (NULL));
}
MonoBoolean
-ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
+ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
{
return worker_request (mono_domain_get ());
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
+ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
{
/* This copy the behavior of the current Mono implementation */
mono_raise_exception (mono_get_exception_not_implemented (NULL));
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
+ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
{
/* This copy the behavior of the current Mono implementation */
return TRUE;
}
MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_Microsoft_ThreadPool_IsThreadPoolHosted (void)
+ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
{
return FALSE;
}