From ad4eb74c6f30c210a4a67ef512a9b2b1d6ba9cca Mon Sep 17 00:00:00 2001 From: Ludovic Henry Date: Wed, 8 Feb 2017 11:35:28 -0500 Subject: [PATCH] [threadpool] Make ThreadPoolWorker staticaly allocated --- mono/metadata/threadpool-worker-default.c | 496 +++++++++++----------- mono/metadata/threadpool-worker.h | 20 +- mono/metadata/threadpool.c | 29 +- 3 files changed, 262 insertions(+), 283 deletions(-) diff --git a/mono/metadata/threadpool-worker-default.c b/mono/metadata/threadpool-worker-default.c index 6e3a4b52130..a0cdd9a2170 100644 --- a/mono/metadata/threadpool-worker-default.c +++ b/mono/metadata/threadpool-worker-default.c @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -127,7 +126,7 @@ typedef union { typedef MonoInternalThread ThreadPoolWorkerThread; -struct MonoThreadPoolWorker { +typedef struct { MonoRefCount ref; ThreadPoolWorkerCounter counters; @@ -165,7 +164,7 @@ struct MonoThreadPoolWorker { gboolean suspended; gint32 monitor_status; -}; +} ThreadPoolWorker; enum { MONITOR_STATUS_REQUESTED, @@ -173,6 +172,8 @@ enum { MONITOR_STATUS_NOT_RUNNING, }; +static ThreadPoolWorker worker; + #define COUNTER_CHECK(counter) \ do { \ g_assert (counter._.max_working > 0); \ @@ -180,23 +181,22 @@ enum { g_assert (counter._.working >= 0); \ } while (0) -#define COUNTER_ATOMIC(worker,var,block) \ +#define COUNTER_ATOMIC(var,block) \ do { \ ThreadPoolWorkerCounter __old; \ do { \ - g_assert (worker); \ - __old = COUNTER_READ (worker); \ + __old = COUNTER_READ (); \ (var) = __old; \ { block; } \ COUNTER_CHECK (var); \ - } while (InterlockedCompareExchange64 (&worker->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ + } while (InterlockedCompareExchange64 (&worker.counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ } while (0) static inline ThreadPoolWorkerCounter -COUNTER_READ (MonoThreadPoolWorker *worker) +COUNTER_READ (void) { ThreadPoolWorkerCounter counter; - counter.as_gint64 = InterlockedRead64 (&worker->counters.as_gint64); + counter.as_gint64 = InterlockedRead64 (&worker.counters.as_gint64); return counter; } @@ -221,49 +221,48 @@ rand_next (gpointer *handle, guint32 min, guint32 max) static void destroy (gpointer data) { - MonoThreadPoolWorker *worker; +#if 0 + mono_coop_mutex_destroy (&worker.threads_lock); + mono_coop_cond_destroy (&worker.parked_threads_cond); + + mono_coop_mutex_destroy (&worker.work_items_lock); - worker = (MonoThreadPoolWorker*) data; - g_assert (worker); + mono_coop_mutex_destroy (&worker.worker_creation_lock); - // FIXME destroy everything + mono_coop_mutex_destroy (&worker.heuristic_lock); - g_free (worker); + g_free (worker.cpu_usage_state); +#endif } void -mono_threadpool_worker_init (MonoThreadPoolWorker **worker) +mono_threadpool_worker_init (void) { - MonoThreadPoolWorker *wk; ThreadPoolHillClimbing *hc; const char *threads_per_cpu_env; gint threads_per_cpu; gint threads_count; - g_assert (worker); - - wk = *worker = g_new0 (MonoThreadPoolWorker, 1); - - mono_refcount_init (wk, destroy); + mono_refcount_init (&worker, destroy); - wk->threads = g_ptr_array_new (); - mono_coop_mutex_init (&wk->threads_lock); - wk->parked_threads_count = 0; - mono_coop_cond_init (&wk->parked_threads_cond); - mono_coop_cond_init (&wk->threads_exit_cond); + worker.threads = g_ptr_array_new (); + mono_coop_mutex_init (&worker.threads_lock); + worker.parked_threads_count = 0; + mono_coop_cond_init (&worker.parked_threads_cond); + mono_coop_cond_init (&worker.threads_exit_cond); - /* wk->work_items_size is inited to 0 */ - mono_coop_mutex_init (&wk->work_items_lock); + /* worker.work_items_size is inited to 0 */ + mono_coop_mutex_init (&worker.work_items_lock); - wk->worker_creation_current_second = -1; - mono_coop_mutex_init (&wk->worker_creation_lock); + worker.worker_creation_current_second = -1; + mono_coop_mutex_init (&worker.worker_creation_lock); - wk->heuristic_adjustment_interval = 10; - mono_coop_mutex_init (&wk->heuristic_lock); + worker.heuristic_adjustment_interval = 10; + mono_coop_mutex_init (&worker.heuristic_lock); mono_rand_open (); - hc = &wk->heuristic_hill_climbing; + hc = &worker.heuristic_hill_climbing; hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; @@ -297,25 +296,25 @@ mono_threadpool_worker_init (MonoThreadPoolWorker **worker) threads_count = mono_cpu_count () * threads_per_cpu; - wk->limit_worker_min = threads_count; + worker.limit_worker_min = threads_count; #if defined (PLATFORM_ANDROID) || defined (HOST_IOS) - wk->limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); + worker.limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); #else - wk->limit_worker_max = threads_count * 100; + worker.limit_worker_max = threads_count * 100; #endif - wk->counters._.max_working = wk->limit_worker_min; + worker.counters._.max_working = worker.limit_worker_min; - wk->cpu_usage_state = g_new0 (MonoCpuUsageState, 1); + worker.cpu_usage_state = g_new0 (MonoCpuUsageState, 1); - wk->suspended = FALSE; + worker.suspended = FALSE; - wk->monitor_status = MONITOR_STATUS_NOT_RUNNING; + worker.monitor_status = MONITOR_STATUS_NOT_RUNNING; } void -mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker) +mono_threadpool_worker_cleanup (void) { MonoInternalThread *current; @@ -325,154 +324,147 @@ mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker) current = mono_thread_internal_current (); - while (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING) + while (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING) mono_thread_info_sleep (1, NULL); - mono_coop_mutex_lock (&worker->threads_lock); + mono_coop_mutex_lock (&worker.threads_lock); - /* unpark all worker->parked_threads */ - mono_coop_cond_broadcast (&worker->parked_threads_cond); + /* unpark all worker.parked_threads */ + mono_coop_cond_broadcast (&worker.parked_threads_cond); #if 0 for (;;) { ThreadPoolWorkerCounter counter; - counter = COUNTER_READ (worker); + counter = COUNTER_READ (); if (counter._.starting + counter._.working + counter._.parked == 0) break; if (counter._.starting + counter._.working + counter._.parked == 1) { - if (worker->threads->len == 1 && g_ptr_array_index (worker->threads, 0) == current) { + if (worker.threads->len == 1 && g_ptr_array_index (worker.threads, 0) == current) { /* We are waiting on ourselves */ break; } } - mono_coop_cond_wait (&worker->threads_exit_cond, &worker->threads_lock); + mono_coop_cond_wait (&worker.threads_exit_cond, &worker.threads_lock); } #endif - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_unlock (&worker.threads_lock); - mono_refcount_dec (worker); + mono_refcount_dec (&worker); } static void -work_item_lock (MonoThreadPoolWorker *worker) +work_item_lock (void) { - mono_coop_mutex_lock (&worker->work_items_lock); + mono_coop_mutex_lock (&worker.work_items_lock); } static void -work_item_unlock (MonoThreadPoolWorker *worker) +work_item_unlock (void) { - mono_coop_mutex_unlock (&worker->work_items_lock); + mono_coop_mutex_unlock (&worker.work_items_lock); } static void -work_item_push (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +work_item_push (MonoThreadPoolWorkerCallback callback, gpointer data) { ThreadPoolWorkItem work_item; - g_assert (worker); g_assert (callback); work_item.callback = callback; work_item.data = data; - work_item_lock (worker); + work_item_lock (); - g_assert (worker->work_items_count <= worker->work_items_size); + g_assert (worker.work_items_count <= worker.work_items_size); - if (G_UNLIKELY (worker->work_items_count == worker->work_items_size)) { - worker->work_items_size += 64; - worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + if (G_UNLIKELY (worker.work_items_count == worker.work_items_size)) { + worker.work_items_size += 64; + worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); } - g_assert (worker->work_items); + g_assert (worker.work_items); - worker->work_items [worker->work_items_count ++] = work_item; + worker.work_items [worker.work_items_count ++] = work_item; - // printf ("[push] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", - // worker->work_items, worker->work_items_count, worker->work_items_size); + // printf ("[push] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", + // worker.work_items, worker.work_items_count, worker.work_items_size); - work_item_unlock (worker); + work_item_unlock (); } static gboolean -work_item_try_pop (MonoThreadPoolWorker *worker, ThreadPoolWorkItem *work_item) +work_item_try_pop (ThreadPoolWorkItem *work_item) { - g_assert (worker); g_assert (work_item); - work_item_lock (worker); + work_item_lock (); - // printf ("[pop] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", - // worker->work_items, worker->work_items_count, worker->work_items_size); + // printf ("[pop] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", + // worker.work_items, worker.work_items_count, worker.work_items_size); - if (worker->work_items_count == 0) { - work_item_unlock (worker); + if (worker.work_items_count == 0) { + work_item_unlock (); return FALSE; } - *work_item = worker->work_items [-- worker->work_items_count]; + *work_item = worker.work_items [-- worker.work_items_count]; - if (G_UNLIKELY (worker->work_items_count >= 64 * 3 && worker->work_items_count < worker->work_items_size / 2)) { - worker->work_items_size -= 64; - worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + if (G_UNLIKELY (worker.work_items_count >= 64 * 3 && worker.work_items_count < worker.work_items_size / 2)) { + worker.work_items_size -= 64; + worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); } - work_item_unlock (worker); + work_item_unlock (); return TRUE; } static gint32 -work_item_count (MonoThreadPoolWorker *worker) +work_item_count (void) { gint32 count; - work_item_lock (worker); - count = worker->work_items_count; - work_item_unlock (worker); + work_item_lock (); + count = worker.work_items_count; + work_item_unlock (); return count; } -static void worker_request (MonoThreadPoolWorker *worker); +static void worker_request (void); void -mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data) { - work_item_push (worker, callback, data); + work_item_push (callback, data); - worker_request (worker); + worker_request (); } static void -worker_wait_interrupt (gpointer data) +worker_wait_interrupt (gpointer unused) { - MonoThreadPoolWorker *worker; - - worker = (MonoThreadPoolWorker*) data; - g_assert (worker); - - mono_coop_mutex_lock (&worker->threads_lock); - mono_coop_cond_signal (&worker->parked_threads_cond); - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_lock (&worker.threads_lock); + mono_coop_cond_signal (&worker.parked_threads_cond); + mono_coop_mutex_unlock (&worker.threads_lock); - mono_refcount_dec (worker); + mono_refcount_dec (&worker); } /* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */ static gboolean -worker_park (MonoThreadPoolWorker *worker) +worker_park (void) { gboolean timeout = FALSE; mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ()); - mono_coop_mutex_lock (&worker->threads_lock); + mono_coop_mutex_lock (&worker.threads_lock); if (!mono_runtime_is_shutting_down ()) { static gpointer rand_handle = NULL; @@ -487,36 +479,37 @@ worker_park (MonoThreadPoolWorker *worker) thread = mono_thread_internal_current (); g_assert (thread); - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.working --; counter._.parked ++; }); - worker->parked_threads_count += 1; + worker.parked_threads_count += 1; - mono_thread_info_install_interrupt (worker_wait_interrupt, mono_refcount_inc (worker), &interrupted); + mono_refcount_inc (&worker); + mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted); if (interrupted) { - mono_refcount_dec (worker); + mono_refcount_dec (&worker); goto done; } - if (mono_coop_cond_timedwait (&worker->parked_threads_cond, &worker->threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) + if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) timeout = TRUE; mono_thread_info_uninstall_interrupt (&interrupted); if (!interrupted) - mono_refcount_dec (worker); + mono_refcount_dec (&worker); done: - worker->parked_threads_count -= 1; + worker.parked_threads_count -= 1; - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.working ++; counter._.parked --; }); } - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_unlock (&worker.threads_lock); mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no"); @@ -524,18 +517,18 @@ done: } static gboolean -worker_try_unpark (MonoThreadPoolWorker *worker) +worker_try_unpark (void) { gboolean res = FALSE; mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); - mono_coop_mutex_lock (&worker->threads_lock); - if (worker->parked_threads_count > 0) { - mono_coop_cond_signal (&worker->parked_threads_cond); + mono_coop_mutex_lock (&worker.threads_lock); + if (worker.parked_threads_count > 0) { + mono_coop_cond_signal (&worker.parked_threads_cond); res = TRUE; } - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_unlock (&worker.threads_lock); mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); @@ -543,18 +536,14 @@ worker_try_unpark (MonoThreadPoolWorker *worker) } static gsize WINAPI -worker_thread (gpointer data) +worker_thread (gpointer unused) { - MonoThreadPoolWorker *worker; MonoInternalThread *thread; ThreadPoolWorkerCounter counter; mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); - worker = (MonoThreadPoolWorker*) data; - g_assert (worker); - - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.starting --; counter._.working ++; }); @@ -562,9 +551,9 @@ worker_thread (gpointer data) thread = mono_thread_internal_current (); g_assert (thread); - mono_coop_mutex_lock (&worker->threads_lock); - g_ptr_array_add (worker->threads, thread); - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_lock (&worker.threads_lock); + g_ptr_array_add (worker.threads, thread); + mono_coop_mutex_unlock (&worker.threads_lock); while (!mono_runtime_is_shutting_down ()) { ThreadPoolWorkItem work_item; @@ -572,10 +561,10 @@ worker_thread (gpointer data) if (mono_thread_interruption_checkpoint ()) continue; - if (!work_item_try_pop (worker, &work_item)) { + if (!work_item_try_pop (&work_item)) { gboolean timeout; - timeout = worker_park (worker); + timeout = worker_park (); if (timeout) break; @@ -588,27 +577,27 @@ worker_thread (gpointer data) work_item.callback (work_item.data); } - mono_coop_mutex_lock (&worker->threads_lock); + mono_coop_mutex_lock (&worker.threads_lock); - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.working --; }); - g_ptr_array_remove (worker->threads, thread); + g_ptr_array_remove (worker.threads, thread); - mono_coop_cond_signal (&worker->threads_exit_cond); + mono_coop_cond_signal (&worker.threads_exit_cond); - mono_coop_mutex_unlock (&worker->threads_lock); + mono_coop_mutex_unlock (&worker.threads_lock); mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); - mono_refcount_dec (worker); + mono_refcount_dec (&worker); return 0; } static gboolean -worker_try_create (MonoThreadPoolWorker *worker) +worker_try_create (void) { MonoError error; MonoInternalThread *thread; @@ -619,7 +608,7 @@ worker_try_create (MonoThreadPoolWorker *worker) if (mono_runtime_is_shutting_down ()) return FALSE; - mono_coop_mutex_lock (&worker->worker_creation_lock); + mono_coop_mutex_lock (&worker.worker_creation_lock); mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); @@ -628,73 +617,72 @@ worker_try_create (MonoThreadPoolWorker *worker) g_warning ("failed to get 100ns ticks"); } else { now = current_ticks / (10 * 1000 * 1000); - if (worker->worker_creation_current_second != now) { - worker->worker_creation_current_second = now; - worker->worker_creation_current_count = 0; + if (worker.worker_creation_current_second != now) { + worker.worker_creation_current_second = now; + worker.worker_creation_current_count = 0; } else { - g_assert (worker->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); - if (worker->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { + g_assert (worker.worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); + if (worker.worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", - mono_native_thread_id_get (), worker->worker_creation_current_count); - mono_coop_mutex_unlock (&worker->worker_creation_lock); + mono_native_thread_id_get (), worker.worker_creation_current_count); + mono_coop_mutex_unlock (&worker.worker_creation_lock); return FALSE; } } } - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { if (counter._.working >= counter._.max_working) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", mono_native_thread_id_get ()); - mono_coop_mutex_unlock (&worker->worker_creation_lock); + mono_coop_mutex_unlock (&worker.worker_creation_lock); return FALSE; } counter._.starting ++; }); - thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, mono_refcount_inc (worker), TRUE, 0, &error); + mono_refcount_inc (&worker); + thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error); if (!thread) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); mono_error_cleanup (&error); - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.starting --; }); - mono_coop_mutex_unlock (&worker->worker_creation_lock); + mono_coop_mutex_unlock (&worker.worker_creation_lock); - mono_refcount_dec (worker); + mono_refcount_dec (&worker); return FALSE; } - worker->worker_creation_current_count += 1; + worker.worker_creation_current_count += 1; mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", - mono_native_thread_id_get (), (gpointer) thread->tid, now, worker->worker_creation_current_count); + mono_native_thread_id_get (), (gpointer) thread->tid, now, worker.worker_creation_current_count); - mono_coop_mutex_unlock (&worker->worker_creation_lock); + mono_coop_mutex_unlock (&worker.worker_creation_lock); return TRUE; } -static void monitor_ensure_running (MonoThreadPoolWorker *worker); +static void monitor_ensure_running (void); static void -worker_request (MonoThreadPoolWorker *worker) +worker_request (void) { - g_assert (worker); - - if (worker->suspended) + if (worker.suspended) return; - monitor_ensure_running (worker); + monitor_ensure_running (); - if (worker_try_unpark (worker)) { + if (worker_try_unpark ()) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); return; } - if (worker_try_create (worker)) { + if (worker_try_create ()) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); return; } @@ -703,19 +691,19 @@ worker_request (MonoThreadPoolWorker *worker) } static gboolean -monitor_should_keep_running (MonoThreadPoolWorker *worker) +monitor_should_keep_running (void) { static gint64 last_should_keep_running = -1; - g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED); - if (InterlockedExchange (&worker->monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { + if (InterlockedExchange (&worker.monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { gboolean should_keep_running = TRUE, force_should_keep_running = FALSE; if (mono_runtime_is_shutting_down ()) { should_keep_running = FALSE; } else { - if (work_item_count (worker) == 0) + if (work_item_count () == 0) should_keep_running = FALSE; if (!should_keep_running) { @@ -730,50 +718,44 @@ monitor_should_keep_running (MonoThreadPoolWorker *worker) last_should_keep_running = mono_100ns_ticks (); } else { last_should_keep_running = -1; - if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) + if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) return FALSE; } } - g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED); return TRUE; } static gboolean -monitor_sufficient_delay_since_last_dequeue (MonoThreadPoolWorker *worker) +monitor_sufficient_delay_since_last_dequeue (void) { gint64 threshold; - g_assert (worker); - - if (worker->cpu_usage < CPU_USAGE_LOW) { + if (worker.cpu_usage < CPU_USAGE_LOW) { threshold = MONITOR_INTERVAL; } else { ThreadPoolWorkerCounter counter; - counter = COUNTER_READ (worker); + counter = COUNTER_READ (); threshold = counter._.max_working * MONITOR_INTERVAL * 2; } - return mono_msec_ticks () >= worker->heuristic_last_dequeue + threshold; + return mono_msec_ticks () >= worker.heuristic_last_dequeue + threshold; } -static void hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); +static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); static gsize WINAPI -monitor_thread (gpointer data) +monitor_thread (gpointer unused) { - MonoThreadPoolWorker *worker; MonoInternalThread *internal; guint i; - worker = (MonoThreadPoolWorker*) data; - g_assert (worker); - internal = mono_thread_internal_current (); g_assert (internal); - mono_cpu_usage (worker->cpu_usage_state); + mono_cpu_usage (worker.cpu_usage_state); // printf ("monitor_thread: start\n"); @@ -785,9 +767,9 @@ monitor_thread (gpointer data) gint32 interval_left = MONITOR_INTERVAL; gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */ - g_assert (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING); + g_assert (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING); - // counter = COUNTER_READ (worker); + // counter = COUNTER_READ (); // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n", // counter._.starting, counter._.working, counter._.parked, counter._.max_working); @@ -810,21 +792,21 @@ monitor_thread (gpointer data) if (mono_runtime_is_shutting_down ()) continue; - if (worker->suspended) + if (worker.suspended) continue; - if (work_item_count (worker) == 0) + if (work_item_count () == 0) continue; - worker->cpu_usage = mono_cpu_usage (worker->cpu_usage_state); + worker.cpu_usage = mono_cpu_usage (worker.cpu_usage_state); - if (!monitor_sufficient_delay_since_last_dequeue (worker)) + if (!monitor_sufficient_delay_since_last_dequeue ()) continue; limit_worker_max_reached = FALSE; - COUNTER_ATOMIC (worker, counter, { - if (counter._.max_working >= worker->limit_worker_max) { + COUNTER_ATOMIC (counter, { + if (counter._.max_working >= worker.limit_worker_max) { limit_worker_max_reached = TRUE; break; } @@ -834,23 +816,23 @@ monitor_thread (gpointer data) if (limit_worker_max_reached) continue; - hill_climbing_force_change (worker, counter._.max_working, TRANSITION_STARVATION); + hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION); for (i = 0; i < 5; ++i) { if (mono_runtime_is_shutting_down ()) break; - if (worker_try_unpark (worker)) { + if (worker_try_unpark ()) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); break; } - if (worker_try_create (worker)) { + if (worker_try_create ()) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); break; } } - } while (monitor_should_keep_running (worker)); + } while (monitor_should_keep_running ()); // printf ("monitor_thread: stop\n"); @@ -860,27 +842,27 @@ monitor_thread (gpointer data) } static void -monitor_ensure_running (MonoThreadPoolWorker *worker) +monitor_ensure_running (void) { MonoError error; for (;;) { - switch (worker->monitor_status) { + switch (worker.monitor_status) { case MONITOR_STATUS_REQUESTED: // printf ("monitor_thread: requested\n"); return; case MONITOR_STATUS_WAITING_FOR_REQUEST: // printf ("monitor_thread: waiting for request\n"); - InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); + InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); break; case MONITOR_STATUS_NOT_RUNNING: // printf ("monitor_thread: not running\n"); if (mono_runtime_is_shutting_down ()) return; - if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { + if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { // printf ("monitor_thread: creating\n"); - if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, worker, TRUE, SMALL_STACK, &error)) { + if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK, &error)) { // printf ("monitor_thread: creating failed\n"); - worker->monitor_status = MONITOR_STATUS_NOT_RUNNING; + worker.monitor_status = MONITOR_STATUS_NOT_RUNNING; mono_error_cleanup (&error); } return; @@ -892,13 +874,11 @@ monitor_ensure_running (MonoThreadPoolWorker *worker) } static void -hill_climbing_change_thread_count (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) { ThreadPoolHillClimbing *hc; - g_assert (worker); - - hc = &worker->heuristic_hill_climbing; + hc = &worker.heuristic_hill_climbing; mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); @@ -909,32 +889,29 @@ hill_climbing_change_thread_count (MonoThreadPoolWorker *worker, gint16 new_thre } static void -hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) { ThreadPoolHillClimbing *hc; - g_assert (worker); - - hc = &worker->heuristic_hill_climbing; + hc = &worker.heuristic_hill_climbing; if (new_thread_count != hc->last_thread_count) { hc->current_control_setting += new_thread_count - hc->last_thread_count; - hill_climbing_change_thread_count (worker, new_thread_count, transition); + hill_climbing_change_thread_count (new_thread_count, transition); } } static double_complex -hill_climbing_get_wave_component (MonoThreadPoolWorker *worker, gdouble *samples, guint sample_count, gdouble period) +hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period) { ThreadPoolHillClimbing *hc; gdouble w, cosine, sine, coeff, q0, q1, q2; guint i; - g_assert (worker); g_assert (sample_count >= period); g_assert (period >= 2); - hc = &worker->heuristic_hill_climbing; + hc = &worker.heuristic_hill_climbing; w = 2.0 * M_PI / period; cosine = cos (w); @@ -952,7 +929,7 @@ hill_climbing_get_wave_component (MonoThreadPoolWorker *worker, gdouble *samples } static gint16 -hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) +hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) { ThreadPoolHillClimbing *hc; ThreadPoolHeuristicStateTransition transition; @@ -969,14 +946,13 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, double_complex throughput_wave_component; double_complex ratio; - g_assert (worker); g_assert (adjustment_interval); - hc = &worker->heuristic_hill_climbing; + hc = &worker.heuristic_hill_climbing; /* If someone changed the thread count without telling us, update our records accordingly. */ if (current_thread_count != hc->last_thread_count) - hill_climbing_force_change (worker, current_thread_count, TRANSITION_INITIALIZING); + hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING); /* Update the cumulative stats for this thread count */ hc->elapsed_since_last_change += sample_duration; @@ -1007,7 +983,7 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, * range we're targeting, which will not be filtered by the frequency-domain translation. */ if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { /* Not accurate enough yet. Let's accumulate the data so - * far, and tell the MonoThreadPoolWorker to collect a little more. */ + * far, and tell the ThreadPoolWorker to collect a little more. */ hc->accumulated_sample_duration = sample_duration; hc->accumulated_completion_count = completions; *adjustment_interval = 10; @@ -1068,17 +1044,17 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, /* Get the the three different frequency components of the throughput (scaled by average * throughput). Our "error" estimate (the amount of noise that might be present in the * frequency band we're really interested in) is the average of the adjacent bands. */ - throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, hc->wave_period), average_throughput); - throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, adjacent_period_1), average_throughput)); + throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput); + throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput)); if (adjacent_period_2 <= sample_count) { throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component ( - worker, hc->samples, sample_count, adjacent_period_2), average_throughput))); + hc->samples, sample_count, adjacent_period_2), average_throughput))); } /* Do the same for the thread counts, so we have something to compare to. We don't * measure thread count noise, because there is none; these are exact measurements. */ - thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->thread_counts, sample_count, hc->wave_period), average_thread_count); + thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count); /* Update our moving average of the throughput noise. We'll use this * later as feedback to determine the new size of the thread wave. */ @@ -1129,7 +1105,7 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, move = MIN (move, hc->max_change_per_sample); /* If the result was positive, and CPU is > 95%, refuse the move. */ - if (move > 0.0 && worker->cpu_usage > CPU_USAGE_HIGH) + if (move > 0.0 && worker.cpu_usage > CPU_USAGE_HIGH) move = 0.0; /* Apply the move to our control setting. */ @@ -1141,19 +1117,19 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); - /* Make sure our control setting is within the MonoThreadPoolWorker's limits. */ - hc->current_control_setting = CLAMP (hc->current_control_setting, worker->limit_worker_min, worker->limit_worker_max - new_thread_wave_magnitude); + /* Make sure our control setting is within the ThreadPoolWorker's limits. */ + hc->current_control_setting = CLAMP (hc->current_control_setting, worker.limit_worker_min, worker.limit_worker_max - new_thread_wave_magnitude); /* Calculate the new thread count (control setting + square wave). */ new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); - /* Make sure the new thread count doesn't exceed the MonoThreadPoolWorker's limits. */ - new_thread_count = CLAMP (new_thread_count, worker->limit_worker_min, worker->limit_worker_max); + /* Make sure the new thread count doesn't exceed the ThreadPoolWorker's limits. */ + new_thread_count = CLAMP (new_thread_count, worker.limit_worker_min, worker.limit_worker_max); if (new_thread_count != current_thread_count) - hill_climbing_change_thread_count (worker, new_thread_count, transition); + hill_climbing_change_thread_count (new_thread_count, transition); - if (creal (ratio) < 0.0 && new_thread_count == worker->limit_worker_min) + if (creal (ratio) < 0.0 && new_thread_count == worker.limit_worker_min) *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0))); else *adjustment_interval = hc->current_sample_interval; @@ -1162,11 +1138,11 @@ hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, } static gboolean -heuristic_should_adjust (MonoThreadPoolWorker *worker) +heuristic_should_adjust (void) { - if (worker->heuristic_last_dequeue > worker->heuristic_last_adjustment + worker->heuristic_adjustment_interval) { + if (worker.heuristic_last_dequeue > worker.heuristic_last_adjustment + worker.heuristic_adjustment_interval) { ThreadPoolWorkerCounter counter; - counter = COUNTER_READ (worker); + counter = COUNTER_READ (); if (counter._.working <= counter._.max_working) return TRUE; } @@ -1175,96 +1151,98 @@ heuristic_should_adjust (MonoThreadPoolWorker *worker) } static void -heuristic_adjust (MonoThreadPoolWorker *worker) +heuristic_adjust (void) { - if (mono_coop_mutex_trylock (&worker->heuristic_lock) == 0) { - gint32 completions = InterlockedExchange (&worker->heuristic_completions, 0); + if (mono_coop_mutex_trylock (&worker.heuristic_lock) == 0) { + gint32 completions = InterlockedExchange (&worker.heuristic_completions, 0); gint64 sample_end = mono_msec_ticks (); - gint64 sample_duration = sample_end - worker->heuristic_sample_start; + gint64 sample_duration = sample_end - worker.heuristic_sample_start; - if (sample_duration >= worker->heuristic_adjustment_interval / 2) { + if (sample_duration >= worker.heuristic_adjustment_interval / 2) { ThreadPoolWorkerCounter counter; gint16 new_thread_count; - counter = COUNTER_READ (worker); - new_thread_count = hill_climbing_update (worker, counter._.max_working, sample_duration, completions, &worker->heuristic_adjustment_interval); + counter = COUNTER_READ (); + new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &worker.heuristic_adjustment_interval); - COUNTER_ATOMIC (worker, counter, { + COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; }); if (new_thread_count > counter._.max_working) - worker_request (worker); + worker_request (); - worker->heuristic_sample_start = sample_end; - worker->heuristic_last_adjustment = mono_msec_ticks (); + worker.heuristic_sample_start = sample_end; + worker.heuristic_last_adjustment = mono_msec_ticks (); } - mono_coop_mutex_unlock (&worker->heuristic_lock); + mono_coop_mutex_unlock (&worker.heuristic_lock); } } static void -heuristic_notify_work_completed (MonoThreadPoolWorker *worker) +heuristic_notify_work_completed (void) { - g_assert (worker); - - InterlockedIncrement (&worker->heuristic_completions); - worker->heuristic_last_dequeue = mono_msec_ticks (); + InterlockedIncrement (&worker.heuristic_completions); + worker.heuristic_last_dequeue = mono_msec_ticks (); - if (heuristic_should_adjust (worker)) - heuristic_adjust (worker); + if (heuristic_should_adjust ()) + heuristic_adjust (); } gboolean -mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker) +mono_threadpool_worker_notify_completed (void) { ThreadPoolWorkerCounter counter; - heuristic_notify_work_completed (worker); + heuristic_notify_work_completed (); - counter = COUNTER_READ (worker); + counter = COUNTER_READ (); return counter._.working <= counter._.max_working; } gint32 -mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker) +mono_threadpool_worker_get_min (void) { - return worker->limit_worker_min; + return worker.limit_worker_min; } gboolean -mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value) +mono_threadpool_worker_set_min (gint32 value) { - if (value <= 0 || value > worker->limit_worker_max) + if (value <= 0 || value > worker.limit_worker_max) return FALSE; - worker->limit_worker_min = value; + worker.limit_worker_min = value; return TRUE; } gint32 -mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker) +mono_threadpool_worker_get_max (void) { - return worker->limit_worker_max; + return worker.limit_worker_max; } gboolean -mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value) +mono_threadpool_worker_set_max (gint32 value) { - gint32 cpu_count = mono_cpu_count (); + gint32 cpu_count; + + cpu_count = mono_cpu_count (); + if (value < worker.limit_worker_min || value < cpu_count) + return FALSE; - if (value < worker->limit_worker_min || value < cpu_count) + if (value < worker.limit_worker_min || value < cpu_count) return FALSE; - worker->limit_worker_max = value; + worker.limit_worker_max = value; return TRUE; } void -mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended) +mono_threadpool_worker_set_suspended (gboolean suspended) { - worker->suspended = suspended; + worker.suspended = suspended; if (!suspended) - worker_request (worker); + worker_request (); } diff --git a/mono/metadata/threadpool-worker.h b/mono/metadata/threadpool-worker.h index b63df60c915..00cf11d64d0 100644 --- a/mono/metadata/threadpool-worker.h +++ b/mono/metadata/threadpool-worker.h @@ -2,33 +2,33 @@ #ifndef _MONO_METADATA_THREADPOOL_WORKER_H #define _MONO_METADATA_THREADPOOL_WORKER_H -typedef struct MonoThreadPoolWorker MonoThreadPoolWorker; +#include typedef void (*MonoThreadPoolWorkerCallback)(gpointer); void -mono_threadpool_worker_init (MonoThreadPoolWorker **worker); +mono_threadpool_worker_init (void); void -mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker); +mono_threadpool_worker_cleanup (void); void -mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data); +mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data); gboolean -mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker); +mono_threadpool_worker_notify_completed (void); gint32 -mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker); +mono_threadpool_worker_get_min (void); gboolean -mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value); +mono_threadpool_worker_set_min (gint32 value); gint32 -mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker); +mono_threadpool_worker_get_max (void); gboolean -mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value); +mono_threadpool_worker_set_max (gint32 value); void -mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended); +mono_threadpool_worker_set_suspended (gboolean suspended); #endif /* _MONO_METADATA_THREADPOOL_WORKER_H */ diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index f13755608a1..455d616a767 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -79,8 +79,6 @@ typedef struct { gint32 limit_io_min; gint32 limit_io_max; - - MonoThreadPoolWorker *worker; } ThreadPool; static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; @@ -150,7 +148,7 @@ initialize (void) threadpool.limit_io_min = mono_cpu_count (); threadpool.limit_io_max = CLAMP (threadpool.limit_io_min * 100, MIN (threadpool.limit_io_min, 200), MAX (threadpool.limit_io_min, 200)); - mono_threadpool_worker_init (&threadpool.worker); + mono_threadpool_worker_init (); } static void @@ -197,7 +195,7 @@ cleanup (void) mono_coop_mutex_unlock (&threadpool.threads_lock); #endif - mono_threadpool_worker_cleanup (threadpool.worker); + mono_threadpool_worker_cleanup (); mono_refcount_dec (&threadpool); } @@ -666,14 +664,14 @@ void mono_threadpool_suspend (void) { if (mono_lazy_is_initialized (&status)) - mono_threadpool_worker_set_suspended (threadpool.worker, TRUE); + mono_threadpool_worker_set_suspended (TRUE); } void mono_threadpool_resume (void) { if (mono_lazy_is_initialized (&status)) - mono_threadpool_worker_set_suspended (threadpool.worker, FALSE); + mono_threadpool_worker_set_suspended (FALSE); } void @@ -688,7 +686,7 @@ ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_ counter = COUNTER_READ (); - *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool.worker) - counter._.working); + *worker_threads = MAX (0, mono_threadpool_worker_get_max () - counter._.working); *completion_port_threads = threadpool.limit_io_max; } @@ -700,7 +698,7 @@ ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_thread mono_lazy_initialize (&status, initialize); - *worker_threads = mono_threadpool_worker_get_min (threadpool.worker); + *worker_threads = mono_threadpool_worker_get_min (); *completion_port_threads = threadpool.limit_io_min; } @@ -712,7 +710,7 @@ ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_thread mono_lazy_initialize (&status, initialize); - *worker_threads = mono_threadpool_worker_get_max (threadpool.worker); + *worker_threads = mono_threadpool_worker_get_max (); *completion_port_threads = threadpool.limit_io_max; } @@ -724,7 +722,7 @@ ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads if (completion_port_threads <= 0 || completion_port_threads > threadpool.limit_io_max) return FALSE; - if (!mono_threadpool_worker_set_min (threadpool.worker, worker_threads)) + if (!mono_threadpool_worker_set_min (worker_threads)) return FALSE; threadpool.limit_io_min = completion_port_threads; @@ -742,7 +740,7 @@ ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads if (completion_port_threads < threadpool.limit_io_min || completion_port_threads < cpu_count) return FALSE; - if (!mono_threadpool_worker_set_max (threadpool.worker, worker_threads)) + if (!mono_threadpool_worker_set_max (worker_threads)) return FALSE; threadpool.limit_io_max = completion_port_threads; @@ -767,13 +765,13 @@ ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ()) return FALSE; - return mono_threadpool_worker_notify_completed (threadpool.worker); + return mono_threadpool_worker_notify_completed (); } void ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) { - mono_threadpool_worker_notify_completed (threadpool.worker); + mono_threadpool_worker_notify_completed (); } void @@ -827,7 +825,10 @@ ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) counter._.starting ++; }); - mono_threadpool_worker_enqueue (threadpool.worker, worker_callback, NULL); + mono_threadpool_worker_enqueue (worker_callback, NULL); + + /* we do not decrement the threadpool refcount, + * as it's going to be done in the worker_callback */ return TRUE; } -- 2.25.1