#include <mono/metadata/class-internals.h>
#include <mono/metadata/exception.h>
-#include <mono/metadata/gc-internal.h>
+#include <mono/metadata/gc-internals.h>
#include <mono/metadata/object.h>
#include <mono/metadata/object-internals.h>
#include <mono/metadata/threadpool-ms.h>
#include <mono/utils/mono-complex.h>
#include <mono/utils/mono-lazy-init.h>
#include <mono/utils/mono-logger.h>
-#include <mono/utils/mono-logger-internal.h>
+#include <mono/utils/mono-logger-internals.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-threads.h>
#include <mono/utils/mono-time.h>
#define CPU_USAGE_LOW 80
#define CPU_USAGE_HIGH 95
-#define MONITOR_INTERVAL 100 // ms
+#define MONITOR_INTERVAL 500 // ms
#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
+#define WORKER_CREATION_MAX_PER_SEC 10
+
/* The exponent to apply to the gain. 1.0 means to use linear gain,
* higher values will enhance large moves and damp small ones.
* default: 2.0 */
} ThreadPoolDomain;
typedef MonoInternalThread ThreadPoolWorkingThread;
-typedef mono_cond_t ThreadPoolParkedThread;
typedef struct {
gint32 wave_period;
ThreadPoolCounter counters;
GPtrArray *domains; // ThreadPoolDomain* []
- mono_mutex_t domains_lock;
+ MonoCoopMutex domains_lock;
GPtrArray *working_threads; // ThreadPoolWorkingThread* []
- GPtrArray *parked_threads; // ThreadPoolParkedThread* []
- mono_mutex_t active_threads_lock; /* protect access to working_threads and parked_threads */
+ gint32 parked_threads_count;
+ MonoCoopCond parked_threads_cond;
+ MonoCoopMutex active_threads_lock; /* protect access to working_threads and parked_threads */
+
+ guint32 worker_creation_current_second;
+ guint32 worker_creation_current_count;
+ MonoCoopMutex worker_creation_lock;
gint32 heuristic_completions;
guint32 heuristic_sample_start;
guint32 heuristic_last_adjustment; // ms
guint32 heuristic_adjustment_interval; // ms
ThreadPoolHillClimbing heuristic_hill_climbing;
- mono_mutex_t heuristic_lock;
+ MonoCoopMutex heuristic_lock;
gint32 limit_worker_min;
gint32 limit_worker_max;
static guint32
rand_next (gpointer *handle, guint32 min, guint32 max)
{
+ MonoError error;
guint32 val;
- if (!mono_rand_try_get_uint32 (handle, &val, min, max)) {
- // FIXME handle error
- g_assert_not_reached ();
- }
+ mono_rand_try_get_uint32 (handle, &val, min, max, &error);
+ // FIXME handle error
+ mono_error_assert_ok (&error);
return val;
}
g_assert (threadpool);
threadpool->domains = g_ptr_array_new ();
- mono_mutex_init_recursive (&threadpool->domains_lock);
+ mono_coop_mutex_init (&threadpool->domains_lock);
- threadpool->parked_threads = g_ptr_array_new ();
+ threadpool->parked_threads_count = 0;
+ mono_coop_cond_init (&threadpool->parked_threads_cond);
threadpool->working_threads = g_ptr_array_new ();
- mono_mutex_init (&threadpool->active_threads_lock);
+ mono_coop_mutex_init (&threadpool->active_threads_lock);
+
+ threadpool->worker_creation_current_second = -1;
+ mono_coop_mutex_init (&threadpool->worker_creation_lock);
threadpool->heuristic_adjustment_interval = 10;
- mono_mutex_init (&threadpool->heuristic_lock);
+ mono_coop_mutex_init (&threadpool->heuristic_lock);
mono_rand_open ();
threads_count = mono_cpu_count () * threads_per_cpu;
threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
+
+#if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
+ threadpool->limit_worker_max = threadpool->limit_io_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
+#else
threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
+#endif
threadpool->counters._.max_working = threadpool->limit_worker_min;
threadpool->suspended = FALSE;
}
-static void worker_unpark (ThreadPoolParkedThread *thread);
static void worker_kill (ThreadPoolWorkingThread *thread);
static void
g_assert (mono_runtime_is_shutting_down ());
while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
- g_usleep (1000);
+ mono_thread_info_sleep (1, NULL);
- mono_mutex_lock (&threadpool->active_threads_lock);
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
/* stop all threadpool->working_threads */
for (i = 0; i < threadpool->working_threads->len; ++i)
worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
/* unpark all threadpool->parked_threads */
- for (i = 0; i < threadpool->parked_threads->len; ++i)
- worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
+ mono_coop_cond_broadcast (&threadpool->parked_threads_cond);
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
}
-void
-mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item)
+gboolean
+mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error)
{
static MonoClass *threadpool_class = NULL;
static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
MonoBoolean f;
gpointer args [2];
+ mono_error_init (error);
g_assert (work_item);
if (!threadpool_class)
- threadpool_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
- g_assert (threadpool_class);
+ threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
if (!unsafe_queue_custom_work_item_method)
unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
current_domain = mono_domain_get ();
if (current_domain == domain) {
- mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
+ mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
+ return_val_if_nok (error, FALSE);
} else {
mono_thread_push_appdomain_ref (domain);
if (mono_domain_set (domain, FALSE)) {
- mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
+ mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
+ if (!is_ok (error)) {
+ mono_thread_pop_appdomain_ref ();
+ return FALSE;
+ }
mono_domain_set (current_domain, TRUE);
}
mono_thread_pop_appdomain_ref ();
}
+ return TRUE;
}
+/* LOCKING: threadpool->domains_lock must be held */
static void
domain_add (ThreadPoolDomain *tpdomain)
{
g_assert (tpdomain);
- mono_mutex_lock (&threadpool->domains_lock);
len = threadpool->domains->len;
for (i = 0; i < len; ++i) {
if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
break;
}
+
if (i == len)
g_ptr_array_add (threadpool->domains, tpdomain);
- mono_mutex_unlock (&threadpool->domains_lock);
}
+/* LOCKING: threadpool->domains_lock must be held */
static gboolean
domain_remove (ThreadPoolDomain *tpdomain)
{
- gboolean res;
-
g_assert (tpdomain);
-
- mono_mutex_lock (&threadpool->domains_lock);
- res = g_ptr_array_remove (threadpool->domains, tpdomain);
- mono_mutex_unlock (&threadpool->domains_lock);
-
- return res;
+ return g_ptr_array_remove (threadpool->domains, tpdomain);
}
+/* LOCKING: threadpool->domains_lock must be held */
static ThreadPoolDomain *
domain_get (MonoDomain *domain, gboolean create)
{
g_assert (domain);
- mono_mutex_lock (&threadpool->domains_lock);
for (i = 0; i < threadpool->domains->len; ++i) {
- ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
- if (tmp->domain == domain) {
- tpdomain = tmp;
- break;
- }
+ tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
+ if (tpdomain->domain == domain)
+ return tpdomain;
}
- if (!tpdomain && create) {
+
+ if (create) {
tpdomain = g_new0 (ThreadPoolDomain, 1);
tpdomain->domain = domain;
domain_add (tpdomain);
}
- mono_mutex_unlock (&threadpool->domains_lock);
+
return tpdomain;
}
g_free (tpdomain);
}
+/* LOCKING: threadpool->domains_lock must be held */
static gboolean
domain_any_has_request (void)
{
- gboolean res = FALSE;
guint i;
- mono_mutex_lock (&threadpool->domains_lock);
for (i = 0; i < threadpool->domains->len; ++i) {
- ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
- if (tmp->outstanding_request > 0) {
- res = TRUE;
- break;
- }
+ ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
+ if (tmp->outstanding_request > 0)
+ return TRUE;
}
- mono_mutex_unlock (&threadpool->domains_lock);
- return res;
+
+ return FALSE;
}
+/* LOCKING: threadpool->domains_lock must be held */
static ThreadPoolDomain *
domain_get_next (ThreadPoolDomain *current)
{
ThreadPoolDomain *tpdomain = NULL;
guint len;
- mono_mutex_lock (&threadpool->domains_lock);
len = threadpool->domains->len;
if (len > 0) {
guint i, current_idx = -1;
g_assert (current_idx >= 0);
}
for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
- ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
+ ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
if (tmp->outstanding_request > 0) {
tpdomain = tmp;
break;
}
}
}
- mono_mutex_unlock (&threadpool->domains_lock);
+
return tpdomain;
}
static void
-worker_park (void)
+worker_wait_interrupt (gpointer data)
{
- mono_cond_t cond;
- MonoInternalThread *thread = mono_thread_internal_current ();
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
+ mono_coop_cond_signal (&threadpool->parked_threads_cond);
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
+}
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", GetCurrentThreadId ());
+/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
+static gboolean
+worker_park (void)
+{
+ gboolean timeout = FALSE;
- mono_cond_init (&cond, NULL);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ());
mono_gc_set_skip_thread (TRUE);
- MONO_PREPARE_BLOCKING;
-
- mono_mutex_lock (&threadpool->active_threads_lock);
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
if (!mono_runtime_is_shutting_down ()) {
- g_ptr_array_add (threadpool->parked_threads, &cond);
- g_ptr_array_remove_fast (threadpool->working_threads, thread);
+ static gpointer rand_handle = NULL;
+ MonoInternalThread *thread_internal;
+ gboolean interrupted = FALSE;
- mono_cond_wait (&cond, &threadpool->active_threads_lock);
+ if (!rand_handle)
+ rand_handle = rand_create ();
+ g_assert (rand_handle);
- g_ptr_array_add (threadpool->working_threads, thread);
- g_ptr_array_remove (threadpool->parked_threads, &cond);
- }
+ thread_internal = mono_thread_internal_current ();
+ g_assert (thread_internal);
+
+ threadpool->parked_threads_count += 1;
+ g_ptr_array_remove_fast (threadpool->working_threads, thread_internal);
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
+ if (interrupted)
+ goto done;
- MONO_FINISH_BLOCKING;
+ if (mono_coop_cond_timedwait (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next ((void **)rand_handle, 5 * 1000, 60 * 1000)) != 0)
+ timeout = TRUE;
+
+ mono_thread_info_uninstall_interrupt (&interrupted);
+
+done:
+ g_ptr_array_add (threadpool->working_threads, thread_internal);
+ threadpool->parked_threads_count -= 1;
+ }
+
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
mono_gc_set_skip_thread (FALSE);
- mono_cond_destroy (&cond);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking", GetCurrentThreadId ());
+ return timeout;
}
static gboolean
worker_try_unpark (void)
{
gboolean res = FALSE;
- guint len;
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
- mono_mutex_lock (&threadpool->active_threads_lock);
- len = threadpool->parked_threads->len;
- if (len > 0) {
- mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
- mono_cond_signal (cond);
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
+ if (threadpool->parked_threads_count > 0) {
+ mono_coop_cond_signal (&threadpool->parked_threads_cond);
res = TRUE;
}
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", GetCurrentThreadId (), res ? "yes" : "no");
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
return res;
}
-static void
-worker_unpark (ThreadPoolParkedThread *thread)
-{
- mono_cond_signal ((mono_cond_t*) thread);
-}
-
static void
worker_kill (ThreadPoolWorkingThread *thread)
{
static void
worker_thread (gpointer data)
{
+ MonoError error;
MonoInternalThread *thread;
ThreadPoolDomain *tpdomain, *previous_tpdomain;
ThreadPoolCounter counter;
gboolean retire = FALSE;
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
g_assert (threadpool);
mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"), FALSE);
- mono_mutex_lock (&threadpool->active_threads_lock);
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
g_ptr_array_add (threadpool->working_threads, thread);
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
previous_tpdomain = NULL;
- mono_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_lock (&threadpool->domains_lock);
while (!mono_runtime_is_shutting_down ()) {
tpdomain = NULL;
if ((thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) {
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
mono_thread_interruption_checkpoint ();
- mono_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_lock (&threadpool->domains_lock);
}
if (retire || !(tpdomain = domain_get_next (previous_tpdomain))) {
+ gboolean timeout;
+
COUNTER_ATOMIC (counter, {
counter._.working --;
counter._.parked ++;
});
- mono_mutex_unlock (&threadpool->domains_lock);
- worker_park ();
- mono_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
+ timeout = worker_park ();
+ mono_coop_mutex_lock (&threadpool->domains_lock);
COUNTER_ATOMIC (counter, {
counter._.working ++;
counter._.parked --;
});
+ if (timeout)
+ break;
+
if (retire)
retire = FALSE;
g_assert (tpdomain->outstanding_request >= 0);
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
- GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+ mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
g_assert (tpdomain->domain);
g_assert (tpdomain->domain->threadpool_jobs >= 0);
tpdomain->domain->threadpool_jobs ++;
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
mono_thread_push_appdomain_ref (tpdomain->domain);
if (mono_domain_set (tpdomain->domain, FALSE)) {
- MonoObject *exc = NULL;
- MonoObject *res = mono_runtime_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc);
- if (exc)
+ MonoObject *exc = NULL, *res;
+
+ res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error);
+ if (exc || !mono_error_ok(&error)) {
+ if (exc == NULL)
+ exc = (MonoObject *) mono_error_convert_to_exception (&error);
+ else
+ mono_error_cleanup (&error);
mono_thread_internal_unhandled_exception (exc);
- else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
+ } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
retire = TRUE;
- mono_thread_clr_state (thread , ~ThreadState_Background);
+ mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
if (!mono_thread_test_state (thread , ThreadState_Background))
ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
mono_thread_pop_appdomain_ref ();
- mono_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_lock (&threadpool->domains_lock);
tpdomain->domain->threadpool_jobs --;
g_assert (tpdomain->domain->threadpool_jobs >= 0);
previous_tpdomain = tpdomain;
}
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
- mono_mutex_lock (&threadpool->active_threads_lock);
+ mono_coop_mutex_lock (&threadpool->active_threads_lock);
g_ptr_array_remove_fast (threadpool->working_threads, thread);
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ mono_coop_mutex_unlock (&threadpool->active_threads_lock);
COUNTER_ATOMIC (counter, {
counter._.working--;
counter._.active --;
});
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
}
static gboolean
{
ThreadPoolCounter counter;
MonoInternalThread *thread;
+ gint32 now;
+
+ mono_coop_mutex_lock (&threadpool->worker_creation_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", GetCurrentThreadId ());
+ if ((now = mono_100ns_ticks () / 10 / 1000 / 1000) == 0) {
+ g_warning ("failed to get 100ns ticks");
+ } else {
+ if (threadpool->worker_creation_current_second != now) {
+ threadpool->worker_creation_current_second = now;
+ threadpool->worker_creation_current_count = 0;
+ } else {
+ g_assert (threadpool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
+ if (threadpool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
+ mono_native_thread_id_get (), threadpool->worker_creation_current_count);
+ mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
+ return FALSE;
+ }
+ }
+ }
COUNTER_ATOMIC (counter, {
- if (counter._.working >= counter._.max_working)
+ if (counter._.working >= counter._.max_working) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
+ mono_native_thread_id_get ());
+ mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
return FALSE;
+ }
counter._.working ++;
counter._.active ++;
});
if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0)) != NULL) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p",
- GetCurrentThreadId (), thread->tid);
+ threadpool->worker_creation_current_count += 1;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), thread->tid, now, threadpool->worker_creation_current_count);
+ mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
return TRUE;
}
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread", mono_native_thread_id_get ());
COUNTER_ATOMIC (counter, {
counter._.working --;
counter._.active --;
});
+ mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
return FALSE;
}
if (mono_runtime_is_shutting_down ())
return FALSE;
- mono_mutex_lock (&threadpool->domains_lock);
+ mono_coop_mutex_lock (&threadpool->domains_lock);
/* synchronize check with worker_thread */
if (mono_domain_is_unloading (domain)) {
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
return FALSE;
}
tpdomain->outstanding_request ++;
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
- GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+ mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
- mono_mutex_unlock (&threadpool->domains_lock);
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
if (threadpool->suspended)
return FALSE;
monitor_ensure_running ();
if (worker_try_unpark ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
return TRUE;
}
if (worker_try_create ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
return TRUE;
}
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
return FALSE;
}
if (mono_runtime_is_shutting_down ()) {
should_keep_running = FALSE;
} else {
+ mono_coop_mutex_lock (&threadpool->domains_lock);
if (!domain_any_has_request ())
should_keep_running = FALSE;
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
if (!should_keep_running) {
if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
mono_cpu_usage (threadpool->cpu_usage_state);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
do {
- MonoInternalThread *thread;
- gboolean all_waitsleepjoin = TRUE;
+ ThreadPoolCounter counter;
+ gboolean limit_worker_max_reached;
gint32 interval_left = MONITOR_INTERVAL;
gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
do {
guint32 ts;
+ gboolean alerted = FALSE;
if (mono_runtime_is_shutting_down ())
break;
ts = mono_msec_ticks ();
- if (SleepEx (interval_left, TRUE) == 0)
+ if (mono_thread_info_sleep (interval_left, &alerted) == 0)
break;
interval_left -= mono_msec_ticks () - ts;
if (threadpool->suspended)
continue;
- if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
+ if (mono_runtime_is_shutting_down ())
continue;
- mono_mutex_lock (&threadpool->active_threads_lock);
- for (i = 0; i < threadpool->working_threads->len; ++i) {
- thread = g_ptr_array_index (threadpool->working_threads, i);
- if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
- all_waitsleepjoin = FALSE;
+ mono_coop_mutex_lock (&threadpool->domains_lock);
+ if (!domain_any_has_request ()) {
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
+ continue;
+ }
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
+
+ threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
+
+ if (!monitor_sufficient_delay_since_last_dequeue ())
+ continue;
+
+ limit_worker_max_reached = FALSE;
+
+ COUNTER_ATOMIC (counter, {
+ if (counter._.max_working >= threadpool->limit_worker_max) {
+ limit_worker_max_reached = TRUE;
break;
}
- }
- mono_mutex_unlock (&threadpool->active_threads_lock);
+ counter._.max_working ++;
+ });
- if (all_waitsleepjoin) {
- ThreadPoolCounter counter;
- COUNTER_ATOMIC (counter, { counter._.max_working ++; });
- hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
- }
+ if (limit_worker_max_reached)
+ continue;
- threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
+ hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
- if (monitor_sufficient_delay_since_last_dequeue ()) {
- for (i = 0; i < 5; ++i) {
- if (mono_runtime_is_shutting_down ())
- break;
+ for (i = 0; i < 5; ++i) {
+ if (mono_runtime_is_shutting_down ())
+ break;
- if (worker_try_unpark ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", GetCurrentThreadId ());
- break;
- }
+ if (worker_try_unpark ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
+ break;
+ }
- if (worker_try_create ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", GetCurrentThreadId ());
- break;
- }
+ if (worker_try_create ()) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
+ break;
}
}
} while (monitor_should_keep_running ());
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", GetCurrentThreadId ());
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
}
static void
hc = &threadpool->heuristic_hill_climbing;
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", GetCurrentThreadId (), new_thread_count);
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
hc->last_thread_count = new_thread_count;
hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
{
g_assert (threadpool);
- if (mono_mutex_trylock (&threadpool->heuristic_lock) == 0) {
+ if (mono_coop_mutex_trylock (&threadpool->heuristic_lock) == 0) {
gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
guint32 sample_end = mono_msec_ticks ();
guint32 sample_duration = sample_end - threadpool->heuristic_sample_start;
threadpool->heuristic_last_adjustment = mono_msec_ticks ();
}
- mono_mutex_unlock (&threadpool->heuristic_lock);
+ mono_coop_mutex_unlock (&threadpool->heuristic_lock);
}
}
}
MonoAsyncResult *
-mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params)
+mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error)
{
static MonoClass *async_call_klass = NULL;
MonoMethodMessage *message;
MonoObject *state = NULL;
if (!async_call_klass)
- async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
- g_assert (async_call_klass);
+ async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
mono_lazy_initialize (&status, initialize);
+ mono_error_init (error);
+
message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
- async_call = (MonoAsyncCall*) mono_object_new (domain, async_call_klass);
+ async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error);
+ return_val_if_nok (error, NULL);
+
MONO_OBJECT_SETREF (async_call, msg, message);
MONO_OBJECT_SETREF (async_call, state, state);
async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call);
MONO_OBJECT_SETREF (async_result, async_delegate, target);
-#ifndef DISABLE_SOCKETS
- if (mono_threadpool_ms_is_io (target, state))
- return mono_threadpool_ms_io_add (async_result, (MonoSocketAsyncResult*) state);
-#endif
-
- mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result);
+ mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result, error);
+ return_val_if_nok (error, NULL);
return async_result;
}
void
ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
{
+ ThreadPoolCounter counter;
+
if (!worker_threads || !completion_port_threads)
return;
mono_lazy_initialize (&status, initialize);
- *worker_threads = threadpool->limit_worker_max;
+ counter.as_gint64 = COUNTER_READ ();
+
+ *worker_threads = MAX (0, threadpool->limit_worker_max - counter._.active);
*completion_port_threads = threadpool->limit_io_max;
}
if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
return FALSE;
- threadpool->limit_worker_max = worker_threads;
- threadpool->limit_io_max = completion_port_threads;
+ threadpool->limit_worker_min = worker_threads;
+ threadpool->limit_io_min = completion_port_threads;
return TRUE;
}
ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
{
// TODO
- mono_raise_exception (mono_get_exception_not_implemented (NULL));
+ MonoError error;
+ mono_error_set_not_implemented (&error, "");
+ mono_error_set_pending_exception (&error);
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
{
/* This copy the behavior of the current Mono implementation */
- mono_raise_exception (mono_get_exception_not_implemented (NULL));
+ MonoError error;
+ mono_error_set_not_implemented (&error, "");
+ mono_error_set_pending_exception (&error);
return FALSE;
}