From 5e845a3bc96ed7dd9058c7d47ad1dee28320bfc3 Mon Sep 17 00:00:00 2001 From: Ludovic Henry Date: Fri, 9 Dec 2016 15:40:28 -0500 Subject: [PATCH] [threadpool] Split domain and worker management (#4117) * [threadpool] Remove `-ms` suffix * [threadpool] Split domain and worker management This will allow us to use native threadpool more easily: we will simply have to implement a threadpool worker with the specific API. On windows, we will use the Win32 Threadpool, and on OSX we will explore using GCD (Grand Central Dispatch). --- mono/metadata/Makefile.am | 12 +- mono/metadata/appdomain.c | 4 +- mono/metadata/console-unix.c | 4 +- mono/metadata/gc.c | 4 +- mono/metadata/icall.c | 4 +- mono/metadata/marshal.c | 6 +- mono/metadata/runtime.c | 4 +- mono/metadata/socket-io.c | 4 +- ...ol-ms-io-epoll.c => threadpool-io-epoll.c} | 0 ...-ms-io-kqueue.c => threadpool-io-kqueue.c} | 0 ...pool-ms-io-poll.c => threadpool-io-poll.c} | 0 .../{threadpool-ms-io.c => threadpool-io.c} | 32 +- .../{threadpool-ms-io.h => threadpool-io.h} | 13 +- mono/metadata/threadpool-ms.c | 1695 ----------------- mono/metadata/threadpool-worker-default.c | 1267 ++++++++++++ mono/metadata/threadpool-worker.h | 34 + mono/metadata/threadpool.c | 835 ++++++++ .../{threadpool-ms.h => threadpool.h} | 20 +- mono/metadata/w32process-win32.c | 2 +- mono/mini/debugger-agent.c | 6 +- mono/utils/mono-lazy-init.h | 2 +- msvc/libmonoruntime.vcxproj | 10 +- msvc/libmonoruntime.vcxproj.filters | 14 +- 23 files changed, 2212 insertions(+), 1760 deletions(-) rename mono/metadata/{threadpool-ms-io-epoll.c => threadpool-io-epoll.c} (100%) rename mono/metadata/{threadpool-ms-io-kqueue.c => threadpool-io-kqueue.c} (100%) rename mono/metadata/{threadpool-ms-io-poll.c => threadpool-io-poll.c} (100%) rename mono/metadata/{threadpool-ms-io.c => threadpool-io.c} (94%) rename mono/metadata/{threadpool-ms-io.h => threadpool-io.h} (55%) delete mode 100644 mono/metadata/threadpool-ms.c create mode 100644 mono/metadata/threadpool-worker-default.c create mode 100644 mono/metadata/threadpool-worker.h create mode 100644 mono/metadata/threadpool.c rename mono/metadata/{threadpool-ms.h => threadpool.h} (72%) diff --git a/mono/metadata/Makefile.am b/mono/metadata/Makefile.am index db8663c6e59..92b99cd28bc 100644 --- a/mono/metadata/Makefile.am +++ b/mono/metadata/Makefile.am @@ -230,10 +230,12 @@ common_sources = \ tabledefs.h \ threads.c \ threads-types.h \ - threadpool-ms.c \ - threadpool-ms.h \ - threadpool-ms-io.c \ - threadpool-ms-io.h \ + threadpool.c \ + threadpool.h \ + threadpool-worker-default.c \ + threadpool-worker.h \ + threadpool-io.c \ + threadpool-io.h \ verify.c \ verify-internals.h \ wrapper-types.h \ @@ -342,4 +344,4 @@ libmonoruntimeinclude_HEADERS = \ verify.h EXTRA_DIST = $(win32_sources) $(unix_sources) $(null_sources) runtime.h \ - threadpool-ms-io-poll.c threadpool-ms-io-epoll.c threadpool-ms-io-kqueue.c sgen-dynarray.h + threadpool-io-poll.c threadpool-io-epoll.c threadpool-io-kqueue.c sgen-dynarray.h diff --git a/mono/metadata/appdomain.c b/mono/metadata/appdomain.c index a571f676b45..9e23de19060 100644 --- a/mono/metadata/appdomain.c +++ b/mono/metadata/appdomain.c @@ -42,7 +42,7 @@ #include #include #include -#include +#include #include #include #include @@ -2418,7 +2418,7 @@ unload_thread_main (void *arg) goto failure; } - if (!mono_threadpool_ms_remove_domain_jobs (domain, -1)) { + if (!mono_threadpool_remove_domain_jobs (domain, -1)) { data->failure_reason = g_strdup_printf ("Cleanup of threadpool jobs of domain %s timed out.", domain->friendly_name); goto failure; } diff --git a/mono/metadata/console-unix.c b/mono/metadata/console-unix.c index 10272322966..7d1e9a1ec00 100644 --- a/mono/metadata/console-unix.c +++ b/mono/metadata/console-unix.c @@ -34,7 +34,7 @@ #include #include #include -#include +#include #include #include #include @@ -258,7 +258,7 @@ do_console_cancel_event (void) method = mono_class_get_method_from_name (klass, "BeginInvoke", -1); g_assert (method != NULL); - mono_threadpool_ms_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error); + mono_threadpool_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error); if (!is_ok (&error)) { g_warning ("Couldn't invoke System.Console cancel handler due to %s", mono_error_get_message (&error)); mono_error_cleanup (&error); diff --git a/mono/metadata/gc.c b/mono/metadata/gc.c index 53cce47edad..ad9db931df4 100644 --- a/mono/metadata/gc.c +++ b/mono/metadata/gc.c @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -556,7 +556,7 @@ mono_domain_finalize (MonoDomain *domain, guint32 timeout) } if (domain == mono_get_root_domain ()) { - mono_threadpool_ms_cleanup (); + mono_threadpool_cleanup (); mono_gc_finalize_threadpool_threads (); } diff --git a/mono/metadata/icall.c b/mono/metadata/icall.c index 946803d692d..c11cbad5228 100644 --- a/mono/metadata/icall.c +++ b/mono/metadata/icall.c @@ -36,8 +36,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include diff --git a/mono/metadata/marshal.c b/mono/metadata/marshal.c index 5eb7a8a9902..e376d860f85 100644 --- a/mono/metadata/marshal.c +++ b/mono/metadata/marshal.c @@ -40,7 +40,7 @@ #include "mono/metadata/cominterop.h" #include "mono/metadata/remoting.h" #include "mono/metadata/reflection-internals.h" -#include "mono/metadata/threadpool-ms.h" +#include "mono/metadata/threadpool.h" #include "mono/metadata/handle.h" #include "mono/utils/mono-counters.h" #include "mono/utils/mono-tls.h" @@ -2451,7 +2451,7 @@ mono_delegate_begin_invoke (MonoDelegate *delegate, gpointer *params) method = mono_get_delegate_invoke (klass); g_assert (method); - MonoAsyncResult *result = mono_threadpool_ms_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error); + MonoAsyncResult *result = mono_threadpool_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error); mono_error_set_pending_exception (&error); return result; } @@ -3210,7 +3210,7 @@ mono_delegate_end_invoke (MonoDelegate *delegate, gpointer *params) } else #endif { - res = mono_threadpool_ms_end_invoke (ares, &out_args, &exc, &error); + res = mono_threadpool_end_invoke (ares, &out_args, &exc, &error); if (mono_error_set_pending_exception (&error)) return NULL; } diff --git a/mono/metadata/runtime.c b/mono/metadata/runtime.c index 078b4cef997..99621d5cca0 100644 --- a/mono/metadata/runtime.c +++ b/mono/metadata/runtime.c @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include @@ -110,7 +110,7 @@ mono_runtime_try_shutdown (void) mono_runtime_set_shutting_down (); /* This will kill the tp threads which cannot be suspended */ - mono_threadpool_ms_cleanup (); + mono_threadpool_cleanup (); /*TODO move the follow to here: mono_thread_suspend_all_other_threads (); OR mono_thread_wait_all_other_threads diff --git a/mono/metadata/socket-io.c b/mono/metadata/socket-io.c index 142c3c53ff2..606eff3692d 100644 --- a/mono/metadata/socket-io.c +++ b/mono/metadata/socket-io.c @@ -54,7 +54,7 @@ #include #include #include -#include +#include #include /* FIXME change this code to not mess so much with the internals */ #include @@ -646,7 +646,7 @@ ves_icall_System_Net_Sockets_Socket_Close_internal (SOCKET sock, gint32 *werror) /* Clear any pending work item from this socket if the underlying * polling system does not notify when the socket is closed */ - mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (sock)); + mono_threadpool_io_remove_socket (GPOINTER_TO_INT (sock)); MONO_ENTER_GC_SAFE; closesocket (sock); diff --git a/mono/metadata/threadpool-ms-io-epoll.c b/mono/metadata/threadpool-io-epoll.c similarity index 100% rename from mono/metadata/threadpool-ms-io-epoll.c rename to mono/metadata/threadpool-io-epoll.c diff --git a/mono/metadata/threadpool-ms-io-kqueue.c b/mono/metadata/threadpool-io-kqueue.c similarity index 100% rename from mono/metadata/threadpool-ms-io-kqueue.c rename to mono/metadata/threadpool-io-kqueue.c diff --git a/mono/metadata/threadpool-ms-io-poll.c b/mono/metadata/threadpool-io-poll.c similarity index 100% rename from mono/metadata/threadpool-ms-io-poll.c rename to mono/metadata/threadpool-io-poll.c diff --git a/mono/metadata/threadpool-ms-io.c b/mono/metadata/threadpool-io.c similarity index 94% rename from mono/metadata/threadpool-ms-io.c rename to mono/metadata/threadpool-io.c index 7cdaf5be684..c7986ab0d60 100644 --- a/mono/metadata/threadpool-ms-io.c +++ b/mono/metadata/threadpool-io.c @@ -1,5 +1,5 @@ /* - * threadpool-ms-io.c: Microsoft IO threadpool runtime support + * threadpool-io.c: Microsoft IO threadpool runtime support * * Author: * Ludovic Henry (ludovic.henry@xamarin.com) @@ -23,8 +23,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -44,9 +44,9 @@ enum MonoIOOperation { EVENT_ERR = 1 << 2, /* not in managed */ }; -#include "threadpool-ms-io-epoll.c" -#include "threadpool-ms-io-kqueue.c" -#include "threadpool-ms-io-poll.c" +#include "threadpool-io-epoll.c" +#include "threadpool-io-kqueue.c" +#include "threadpool-io-poll.c" #define UPDATES_CAPACITY 128 @@ -272,7 +272,7 @@ wait_callback (gint fd, gint events, gpointer user_data) if (list && (events & EVENT_IN) != 0) { MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN); if (job) { - mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); + mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); mono_error_assert_ok (&error); } @@ -280,7 +280,7 @@ wait_callback (gint fd, gint events, gpointer user_data) if (list && (events & EVENT_OUT) != 0) { MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT); if (job) { - mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); + mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); mono_error_assert_ok (&error); } } @@ -378,7 +378,7 @@ selector_thread (gpointer data) } for (; list; list = mono_mlist_remove_item (list, list)) { - mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error); + mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error); mono_error_assert_ok (&error); } @@ -555,7 +555,7 @@ cleanup (void) } void -mono_threadpool_ms_io_cleanup (void) +mono_threadpool_io_cleanup (void) { mono_lazy_cleanup (&io_status, cleanup); } @@ -593,11 +593,11 @@ ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job) void ves_icall_System_IOSelector_Remove (gpointer handle) { - mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle)); + mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle)); } void -mono_threadpool_ms_io_remove_socket (int fd) +mono_threadpool_io_remove_socket (int fd) { ThreadPoolIOUpdate *update; @@ -619,7 +619,7 @@ mono_threadpool_ms_io_remove_socket (int fd) } void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain) +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain) { ThreadPoolIOUpdate *update; @@ -655,19 +655,19 @@ ves_icall_System_IOSelector_Remove (gpointer handle) } void -mono_threadpool_ms_io_cleanup (void) +mono_threadpool_io_cleanup (void) { g_assert_not_reached (); } void -mono_threadpool_ms_io_remove_socket (int fd) +mono_threadpool_io_remove_socket (int fd) { g_assert_not_reached (); } void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain) +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain) { g_assert_not_reached (); } diff --git a/mono/metadata/threadpool-ms-io.h b/mono/metadata/threadpool-io.h similarity index 55% rename from mono/metadata/threadpool-ms-io.h rename to mono/metadata/threadpool-io.h index 106be80a1da..0936ee016cd 100644 --- a/mono/metadata/threadpool-ms-io.h +++ b/mono/metadata/threadpool-io.h @@ -1,5 +1,6 @@ -#ifndef _MONO_THREADPOOL_MS_IO_H_ -#define _MONO_THREADPOOL_MS_IO_H_ + +#ifndef _MONO_METADATA_THREADPOOL_IO_H_ +#define _MONO_METADATA_THREADPOOL_IO_H_ #include #include @@ -16,10 +17,10 @@ void ves_icall_System_IOSelector_Remove (gpointer handle); void -mono_threadpool_ms_io_remove_socket (int fd); +mono_threadpool_io_remove_socket (int fd); void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain); +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain); void -mono_threadpool_ms_io_cleanup (void); +mono_threadpool_io_cleanup (void); -#endif /* _MONO_THREADPOOL_MS_IO_H_ */ +#endif /* _MONO_METADATA_THREADPOOL_IO_H_ */ diff --git a/mono/metadata/threadpool-ms.c b/mono/metadata/threadpool-ms.c deleted file mode 100644 index aa8289cd51d..00000000000 --- a/mono/metadata/threadpool-ms.c +++ /dev/null @@ -1,1695 +0,0 @@ -/* - * threadpool-ms.c: Microsoft threadpool runtime support - * - * Author: - * Ludovic Henry (ludovic.henry@xamarin.com) - * - * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) - * Licensed under the MIT license. See LICENSE file in the project root for full license information. - */ - -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. -// -// Files: -// - src/vm/comthreadpool.cpp -// - src/vm/win32threadpoolcpp -// - src/vm/threadpoolrequest.cpp -// - src/vm/hillclimbing.cpp -// -// Ported from C++ to C and adjusted to Mono runtime - -#include -#define _USE_MATH_DEFINES // needed by MSVC to define math constants -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define CPU_USAGE_LOW 80 -#define CPU_USAGE_HIGH 95 - -#define MONITOR_INTERVAL 500 // ms -#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms - -#define WORKER_CREATION_MAX_PER_SEC 10 - -/* The exponent to apply to the gain. 1.0 means to use linear gain, - * higher values will enhance large moves and damp small ones. - * default: 2.0 */ -#define HILL_CLIMBING_GAIN_EXPONENT 2.0 - -/* The 'cost' of a thread. 0 means drive for increased throughput regardless - * of thread count, higher values bias more against higher thread counts. - * default: 0.15 */ -#define HILL_CLIMBING_BIAS 0.15 - -#define HILL_CLIMBING_WAVE_PERIOD 4 -#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20 -#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0 -#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8 -#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0 -#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4 -#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20 -#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10 -#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200 -#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01 -#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15 - -typedef union { - struct { - gint16 max_working; /* determined by heuristic */ - gint16 active; /* executing worker_thread */ - gint16 working; /* actively executing worker_thread, not parked */ - gint16 parked; /* parked */ - } _; - gint64 as_gint64; -} ThreadPoolCounter; - -typedef struct { - MonoDomain *domain; - /* Number of outstanding jobs */ - gint32 outstanding_request; - /* Number of currently executing jobs */ - int threadpool_jobs; - /* Signalled when threadpool_jobs + outstanding_request is 0 */ - /* Protected by threadpool->domains_lock */ - MonoCoopCond cleanup_cond; -} ThreadPoolDomain; - -typedef MonoInternalThread ThreadPoolWorkingThread; - -typedef struct { - gint32 wave_period; - gint32 samples_to_measure; - gdouble target_throughput_ratio; - gdouble target_signal_to_noise_ratio; - gdouble max_change_per_second; - gdouble max_change_per_sample; - gint32 max_thread_wave_magnitude; - gint32 sample_interval_low; - gdouble thread_magnitude_multiplier; - gint32 sample_interval_high; - gdouble throughput_error_smoothing_factor; - gdouble gain_exponent; - gdouble max_sample_error; - - gdouble current_control_setting; - gint64 total_samples; - gint16 last_thread_count; - gdouble elapsed_since_last_change; - gdouble completions_since_last_change; - - gdouble average_throughput_noise; - - gdouble *samples; - gdouble *thread_counts; - - guint32 current_sample_interval; - gpointer random_interval_generator; - - gint32 accumulated_completion_count; - gdouble accumulated_sample_duration; -} ThreadPoolHillClimbing; - -typedef struct { - ThreadPoolCounter counters; - - GPtrArray *domains; // ThreadPoolDomain* [] - MonoCoopMutex domains_lock; - - GPtrArray *working_threads; // ThreadPoolWorkingThread* [] - gint32 parked_threads_count; - MonoCoopCond parked_threads_cond; - MonoCoopMutex active_threads_lock; /* protect access to working_threads and parked_threads */ - - guint32 worker_creation_current_second; - guint32 worker_creation_current_count; - MonoCoopMutex worker_creation_lock; - - gint32 heuristic_completions; - gint64 heuristic_sample_start; - gint64 heuristic_last_dequeue; // ms - gint64 heuristic_last_adjustment; // ms - gint64 heuristic_adjustment_interval; // ms - ThreadPoolHillClimbing heuristic_hill_climbing; - MonoCoopMutex heuristic_lock; - - gint32 limit_worker_min; - gint32 limit_worker_max; - gint32 limit_io_min; - gint32 limit_io_max; - - MonoCpuUsageState *cpu_usage_state; - gint32 cpu_usage; - - /* suspended by the debugger */ - gboolean suspended; -} ThreadPool; - -typedef enum { - TRANSITION_WARMUP, - TRANSITION_INITIALIZING, - TRANSITION_RANDOM_MOVE, - TRANSITION_CLIMBING_MOVE, - TRANSITION_CHANGE_POINT, - TRANSITION_STABILIZING, - TRANSITION_STARVATION, - TRANSITION_THREAD_TIMED_OUT, - TRANSITION_UNDEFINED, -} ThreadPoolHeuristicStateTransition; - -static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; - -enum { - MONITOR_STATUS_REQUESTED, - MONITOR_STATUS_WAITING_FOR_REQUEST, - MONITOR_STATUS_NOT_RUNNING, -}; - -static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING; - -static ThreadPool* threadpool; - -#define COUNTER_CHECK(counter) \ - do { \ - g_assert (counter._.max_working > 0); \ - g_assert (counter._.working >= 0); \ - g_assert (counter._.active >= 0); \ - } while (0) - -#define COUNTER_READ() (InterlockedRead64 (&threadpool->counters.as_gint64)) - -#define COUNTER_ATOMIC(var,block) \ - do { \ - ThreadPoolCounter __old; \ - do { \ - g_assert (threadpool); \ - __old.as_gint64 = COUNTER_READ (); \ - (var) = __old; \ - { block; } \ - COUNTER_CHECK (var); \ - } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ - } while (0) - -#define COUNTER_TRY_ATOMIC(res,var,block) \ - do { \ - ThreadPoolCounter __old; \ - do { \ - g_assert (threadpool); \ - __old.as_gint64 = COUNTER_READ (); \ - (var) = __old; \ - (res) = FALSE; \ - { block; } \ - COUNTER_CHECK (var); \ - (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \ - } while (0); \ - } while (0) - -static inline void -domains_lock (void) -{ - mono_coop_mutex_lock (&threadpool->domains_lock); -} - -static inline void -domains_unlock (void) -{ - mono_coop_mutex_unlock (&threadpool->domains_lock); -} - -static gpointer -rand_create (void) -{ - mono_rand_open (); - return mono_rand_init (NULL, 0); -} - -static guint32 -rand_next (gpointer *handle, guint32 min, guint32 max) -{ - MonoError error; - guint32 val; - mono_rand_try_get_uint32 (handle, &val, min, max, &error); - // FIXME handle error - mono_error_assert_ok (&error); - return val; -} - -static void -rand_free (gpointer handle) -{ - mono_rand_close (handle); -} - -static void -initialize (void) -{ - ThreadPoolHillClimbing *hc; - const char *threads_per_cpu_env; - gint threads_per_cpu; - gint threads_count; - - g_assert (!threadpool); - threadpool = g_new0 (ThreadPool, 1); - g_assert (threadpool); - - threadpool->domains = g_ptr_array_new (); - mono_coop_mutex_init (&threadpool->domains_lock); - - threadpool->parked_threads_count = 0; - mono_coop_cond_init (&threadpool->parked_threads_cond); - threadpool->working_threads = g_ptr_array_new (); - mono_coop_mutex_init (&threadpool->active_threads_lock); - - threadpool->worker_creation_current_second = -1; - mono_coop_mutex_init (&threadpool->worker_creation_lock); - - threadpool->heuristic_adjustment_interval = 10; - mono_coop_mutex_init (&threadpool->heuristic_lock); - - mono_rand_open (); - - hc = &threadpool->heuristic_hill_climbing; - - hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; - hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; - hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER; - hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE; - hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS; - hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO; - hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND; - hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE; - hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW; - hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH; - hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR; - hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT; - hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT; - hc->current_control_setting = 0; - hc->total_samples = 0; - hc->last_thread_count = 0; - hc->average_throughput_noise = 0; - hc->elapsed_since_last_change = 0; - hc->accumulated_completion_count = 0; - hc->accumulated_sample_duration = 0; - hc->samples = g_new0 (gdouble, hc->samples_to_measure); - hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure); - hc->random_interval_generator = rand_create (); - hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); - - if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU"))) - threads_per_cpu = 1; - else - threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50); - - threads_count = mono_cpu_count () * threads_per_cpu; - - threadpool->limit_worker_min = threadpool->limit_io_min = threads_count; - -#if defined (PLATFORM_ANDROID) || defined (HOST_IOS) - threadpool->limit_worker_max = threadpool->limit_io_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); -#else - threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100; -#endif - - threadpool->counters._.max_working = threadpool->limit_worker_min; - - threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1); - - threadpool->suspended = FALSE; -} - -static void worker_kill (ThreadPoolWorkingThread *thread); - -static void -cleanup (void) -{ - guint i; - - /* we make the assumption along the code that we are - * cleaning up only if the runtime is shutting down */ - g_assert (mono_runtime_is_shutting_down ()); - - while (monitor_status != MONITOR_STATUS_NOT_RUNNING) - mono_thread_info_sleep (1, NULL); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - - /* stop all threadpool->working_threads */ - for (i = 0; i < threadpool->working_threads->len; ++i) - worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i)); - - /* unpark all threadpool->parked_threads */ - mono_coop_cond_broadcast (&threadpool->parked_threads_cond); - - mono_coop_mutex_unlock (&threadpool->active_threads_lock); -} - -gboolean -mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error) -{ - static MonoClass *threadpool_class = NULL; - static MonoMethod *unsafe_queue_custom_work_item_method = NULL; - MonoDomain *current_domain; - MonoBoolean f; - gpointer args [2]; - - mono_error_init (error); - g_assert (work_item); - - if (!threadpool_class) - threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool"); - - if (!unsafe_queue_custom_work_item_method) - unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2); - g_assert (unsafe_queue_custom_work_item_method); - - f = FALSE; - - args [0] = (gpointer) work_item; - args [1] = (gpointer) &f; - - current_domain = mono_domain_get (); - if (current_domain == domain) { - mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); - return_val_if_nok (error, FALSE); - } else { - mono_thread_push_appdomain_ref (domain); - if (mono_domain_set (domain, FALSE)) { - mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); - if (!is_ok (error)) { - mono_thread_pop_appdomain_ref (); - return FALSE; - } - mono_domain_set (current_domain, TRUE); - } - mono_thread_pop_appdomain_ref (); - } - return TRUE; -} - -/* LOCKING: domains_lock must be held */ -static void -tpdomain_add (ThreadPoolDomain *tpdomain) -{ - guint i, len; - - g_assert (tpdomain); - - len = threadpool->domains->len; - for (i = 0; i < len; ++i) { - if (g_ptr_array_index (threadpool->domains, i) == tpdomain) - break; - } - - if (i == len) - g_ptr_array_add (threadpool->domains, tpdomain); -} - -/* LOCKING: domains_lock must be held. */ -static gboolean -tpdomain_remove (ThreadPoolDomain *tpdomain) -{ - g_assert (tpdomain); - return g_ptr_array_remove (threadpool->domains, tpdomain); -} - -/* LOCKING: domains_lock must be held */ -static ThreadPoolDomain * -tpdomain_get (MonoDomain *domain, gboolean create) -{ - guint i; - ThreadPoolDomain *tpdomain; - - g_assert (domain); - - for (i = 0; i < threadpool->domains->len; ++i) { - ThreadPoolDomain *tpdomain; - - tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); - if (tpdomain->domain == domain) - return tpdomain; - } - - if (!create) - return NULL; - - tpdomain = g_new0 (ThreadPoolDomain, 1); - tpdomain->domain = domain; - mono_coop_cond_init (&tpdomain->cleanup_cond); - - tpdomain_add (tpdomain); - - return tpdomain; -} - -static void -tpdomain_free (ThreadPoolDomain *tpdomain) -{ - g_free (tpdomain); -} - -/* LOCKING: domains_lock must be held */ -static gboolean -domain_any_has_request (void) -{ - guint i; - - for (i = 0; i < threadpool->domains->len; ++i) { - ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); - if (tmp->outstanding_request > 0) - return TRUE; - } - - return FALSE; -} - -/* LOCKING: domains_lock must be held */ -static ThreadPoolDomain * -tpdomain_get_next (ThreadPoolDomain *current) -{ - ThreadPoolDomain *tpdomain = NULL; - guint len; - - len = threadpool->domains->len; - if (len > 0) { - guint i, current_idx = -1; - if (current) { - for (i = 0; i < len; ++i) { - if (current == g_ptr_array_index (threadpool->domains, i)) { - current_idx = i; - break; - } - } - g_assert (current_idx != (guint)-1); - } - for (i = current_idx + 1; i < len + current_idx + 1; ++i) { - ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len); - if (tmp->outstanding_request > 0) { - tpdomain = tmp; - break; - } - } - } - - return tpdomain; -} - -static void -worker_wait_interrupt (gpointer data) -{ - mono_coop_mutex_lock (&threadpool->active_threads_lock); - mono_coop_cond_signal (&threadpool->parked_threads_cond); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); -} - -/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */ -static gboolean -worker_park (void) -{ - gboolean timeout = FALSE; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ()); - - mono_gc_set_skip_thread (TRUE); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - - if (!mono_runtime_is_shutting_down ()) { - static gpointer rand_handle = NULL; - MonoInternalThread *thread_internal; - gboolean interrupted = FALSE; - - if (!rand_handle) - rand_handle = rand_create (); - g_assert (rand_handle); - - thread_internal = mono_thread_internal_current (); - g_assert (thread_internal); - - threadpool->parked_threads_count += 1; - g_ptr_array_remove_fast (threadpool->working_threads, thread_internal); - - mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted); - if (interrupted) - goto done; - - if (mono_coop_cond_timedwait (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) - timeout = TRUE; - - mono_thread_info_uninstall_interrupt (&interrupted); - -done: - g_ptr_array_add (threadpool->working_threads, thread_internal); - threadpool->parked_threads_count -= 1; - } - - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - mono_gc_set_skip_thread (FALSE); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no"); - - return timeout; -} - -static gboolean -worker_try_unpark (void) -{ - gboolean res = FALSE; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - if (threadpool->parked_threads_count > 0) { - mono_coop_cond_signal (&threadpool->parked_threads_cond); - res = TRUE; - } - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); - - return res; -} - -static void -worker_kill (ThreadPoolWorkingThread *thread) -{ - if (thread == mono_thread_internal_current ()) - return; - - mono_thread_internal_abort ((MonoInternalThread*) thread); -} - -static void -worker_thread (gpointer data) -{ - MonoError error; - MonoInternalThread *thread; - ThreadPoolDomain *tpdomain, *previous_tpdomain; - ThreadPoolCounter counter; - gboolean retire = FALSE; - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); - - g_assert (threadpool); - - thread = mono_thread_internal_current (); - g_assert (thread); - - mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error); - mono_error_assert_ok (&error); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - g_ptr_array_add (threadpool->working_threads, thread); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - previous_tpdomain = NULL; - - domains_lock (); - - while (!mono_runtime_is_shutting_down ()) { - tpdomain = NULL; - - if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) { - domains_unlock (); - mono_thread_interruption_checkpoint (); - domains_lock (); - } - - if (retire || !(tpdomain = tpdomain_get_next (previous_tpdomain))) { - gboolean timeout; - - COUNTER_ATOMIC (counter, { - counter._.working --; - counter._.parked ++; - }); - - domains_unlock (); - timeout = worker_park (); - domains_lock (); - - COUNTER_ATOMIC (counter, { - counter._.working ++; - counter._.parked --; - }); - - if (timeout) - break; - - if (retire) - retire = FALSE; - - /* The tpdomain->domain might have unloaded, while this thread was parked */ - previous_tpdomain = NULL; - - continue; - } - - tpdomain->outstanding_request --; - g_assert (tpdomain->outstanding_request >= 0); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ", - mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); - - g_assert (tpdomain->domain); - g_assert (tpdomain->threadpool_jobs >= 0); - tpdomain->threadpool_jobs ++; - - /* - * This is needed so there is always an lmf frame in the runtime invoke call below, - * so ThreadAbortExceptions are caught even if the thread is in native code. - */ - mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE; - - domains_unlock (); - - mono_thread_push_appdomain_ref (tpdomain->domain); - if (mono_domain_set (tpdomain->domain, FALSE)) { - MonoObject *exc = NULL, *res; - - res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error); - if (exc || !mono_error_ok(&error)) { - if (exc == NULL) - exc = (MonoObject *) mono_error_convert_to_exception (&error); - else - mono_error_cleanup (&error); - mono_thread_internal_unhandled_exception (exc); - } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE) - retire = TRUE; - - mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background); - if (!mono_thread_test_state (thread , ThreadState_Background)) - ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); - - mono_domain_set (mono_get_root_domain (), TRUE); - } - mono_thread_pop_appdomain_ref (); - - domains_lock (); - - tpdomain->threadpool_jobs --; - g_assert (tpdomain->threadpool_jobs >= 0); - - if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) { - gboolean removed; - - removed = tpdomain_remove (tpdomain); - g_assert (removed); - - mono_coop_cond_signal (&tpdomain->cleanup_cond); - tpdomain = NULL; - } - - previous_tpdomain = tpdomain; - } - - domains_unlock (); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - g_ptr_array_remove_fast (threadpool->working_threads, thread); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - COUNTER_ATOMIC (counter, { - counter._.working--; - counter._.active --; - }); - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); -} - -static gboolean -worker_try_create (void) -{ - ThreadPoolCounter counter; - MonoInternalThread *thread; - gint64 current_ticks; - gint32 now; - - mono_coop_mutex_lock (&threadpool->worker_creation_lock); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); - current_ticks = mono_100ns_ticks (); - now = current_ticks / (10 * 1000 * 1000); - if (0 == current_ticks) { - g_warning ("failed to get 100ns ticks"); - } else { - if (threadpool->worker_creation_current_second != now) { - threadpool->worker_creation_current_second = now; - threadpool->worker_creation_current_count = 0; - } else { - g_assert (threadpool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); - if (threadpool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", - mono_native_thread_id_get (), threadpool->worker_creation_current_count); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; - } - } - } - - COUNTER_ATOMIC (counter, { - if (counter._.working >= counter._.max_working) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", - mono_native_thread_id_get ()); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; - } - counter._.working ++; - counter._.active ++; - }); - - MonoError error; - if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) { - threadpool->worker_creation_current_count += 1; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return TRUE; - } - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); - mono_error_cleanup (&error); - - COUNTER_ATOMIC (counter, { - counter._.working --; - counter._.active --; - }); - - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; -} - -static void monitor_ensure_running (void); - -static gboolean -worker_request (MonoDomain *domain) -{ - ThreadPoolDomain *tpdomain; - - g_assert (domain); - g_assert (threadpool); - - if (mono_runtime_is_shutting_down ()) - return FALSE; - - domains_lock (); - - /* synchronize check with worker_thread */ - if (mono_domain_is_unloading (domain)) { - domains_unlock (); - return FALSE; - } - - tpdomain = tpdomain_get (domain, TRUE); - g_assert (tpdomain); - tpdomain->outstanding_request ++; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d", - mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); - - domains_unlock (); - - if (threadpool->suspended) - return FALSE; - - monitor_ensure_running (); - - if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); - return TRUE; - } - - if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); - return TRUE; - } - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); - return FALSE; -} - -static gboolean -monitor_should_keep_running (void) -{ - static gint64 last_should_keep_running = -1; - - g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED); - - if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { - gboolean should_keep_running = TRUE, force_should_keep_running = FALSE; - - if (mono_runtime_is_shutting_down ()) { - should_keep_running = FALSE; - } else { - domains_lock (); - if (!domain_any_has_request ()) - should_keep_running = FALSE; - domains_unlock (); - - if (!should_keep_running) { - if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) { - should_keep_running = force_should_keep_running = TRUE; - } - } - } - - if (should_keep_running) { - if (last_should_keep_running == -1 || !force_should_keep_running) - last_should_keep_running = mono_100ns_ticks (); - } else { - last_should_keep_running = -1; - if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) - return FALSE; - } - } - - g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED); - - return TRUE; -} - -static gboolean -monitor_sufficient_delay_since_last_dequeue (void) -{ - gint64 threshold; - - g_assert (threadpool); - - if (threadpool->cpu_usage < CPU_USAGE_LOW) { - threshold = MONITOR_INTERVAL; - } else { - ThreadPoolCounter counter; - counter.as_gint64 = COUNTER_READ(); - threshold = counter._.max_working * MONITOR_INTERVAL * 2; - } - - return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold; -} - -static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); - -static void -monitor_thread (void) -{ - MonoInternalThread *current_thread = mono_thread_internal_current (); - guint i; - - mono_cpu_usage (threadpool->cpu_usage_state); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ()); - - do { - ThreadPoolCounter counter; - gboolean limit_worker_max_reached; - gint32 interval_left = MONITOR_INTERVAL; - gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */ - - g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING); - - mono_gc_set_skip_thread (TRUE); - - do { - gint64 ts; - gboolean alerted = FALSE; - - if (mono_runtime_is_shutting_down ()) - break; - - ts = mono_msec_ticks (); - if (mono_thread_info_sleep (interval_left, &alerted) == 0) - break; - interval_left -= mono_msec_ticks () - ts; - - mono_gc_set_skip_thread (FALSE); - if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) - mono_thread_interruption_checkpoint (); - mono_gc_set_skip_thread (TRUE); - } while (interval_left > 0 && ++awake < 10); - - mono_gc_set_skip_thread (FALSE); - - if (threadpool->suspended) - continue; - - if (mono_runtime_is_shutting_down ()) - continue; - - domains_lock (); - if (!domain_any_has_request ()) { - domains_unlock (); - continue; - } - domains_unlock (); - - threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state); - - if (!monitor_sufficient_delay_since_last_dequeue ()) - continue; - - limit_worker_max_reached = FALSE; - - COUNTER_ATOMIC (counter, { - if (counter._.max_working >= threadpool->limit_worker_max) { - limit_worker_max_reached = TRUE; - break; - } - counter._.max_working ++; - }); - - if (limit_worker_max_reached) - continue; - - hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION); - - for (i = 0; i < 5; ++i) { - if (mono_runtime_is_shutting_down ()) - break; - - if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); - break; - } - - if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); - break; - } - } - } while (monitor_should_keep_running ()); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ()); -} - -static void -monitor_ensure_running (void) -{ - MonoError error; - for (;;) { - switch (monitor_status) { - case MONITOR_STATUS_REQUESTED: - return; - case MONITOR_STATUS_WAITING_FOR_REQUEST: - InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); - break; - case MONITOR_STATUS_NOT_RUNNING: - if (mono_runtime_is_shutting_down ()) - return; - if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { - if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK, &error)) { - monitor_status = MONITOR_STATUS_NOT_RUNNING; - mono_error_cleanup (&error); - } - return; - } - break; - default: g_assert_not_reached (); - } - } -} - -static void -hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) -{ - ThreadPoolHillClimbing *hc; - - g_assert (threadpool); - - hc = &threadpool->heuristic_hill_climbing; - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); - - hc->last_thread_count = new_thread_count; - hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); - hc->elapsed_since_last_change = 0; - hc->completions_since_last_change = 0; -} - -static void -hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) -{ - ThreadPoolHillClimbing *hc; - - g_assert (threadpool); - - hc = &threadpool->heuristic_hill_climbing; - - if (new_thread_count != hc->last_thread_count) { - hc->current_control_setting += new_thread_count - hc->last_thread_count; - hill_climbing_change_thread_count (new_thread_count, transition); - } -} - -static double_complex -hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period) -{ - ThreadPoolHillClimbing *hc; - gdouble w, cosine, sine, coeff, q0, q1, q2; - guint i; - - g_assert (threadpool); - g_assert (sample_count >= period); - g_assert (period >= 2); - - hc = &threadpool->heuristic_hill_climbing; - - w = 2.0 * M_PI / period; - cosine = cos (w); - sine = sin (w); - coeff = 2.0 * cosine; - q0 = q1 = q2 = 0; - - for (i = 0; i < sample_count; ++i) { - q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure]; - q2 = q1; - q1 = q0; - } - - return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count)); -} - -static gint16 -hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) -{ - ThreadPoolHillClimbing *hc; - ThreadPoolHeuristicStateTransition transition; - gdouble throughput; - gdouble throughput_error_estimate; - gdouble confidence; - gdouble move; - gdouble gain; - gint sample_index; - gint sample_count; - gint new_thread_wave_magnitude; - gint new_thread_count; - double_complex thread_wave_component; - double_complex throughput_wave_component; - double_complex ratio; - - g_assert (threadpool); - g_assert (adjustment_interval); - - hc = &threadpool->heuristic_hill_climbing; - - /* If someone changed the thread count without telling us, update our records accordingly. */ - if (current_thread_count != hc->last_thread_count) - hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING); - - /* Update the cumulative stats for this thread count */ - hc->elapsed_since_last_change += sample_duration; - hc->completions_since_last_change += completions; - - /* Add in any data we've already collected about this sample */ - sample_duration += hc->accumulated_sample_duration; - completions += hc->accumulated_completion_count; - - /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end - * of each work item, we are goinng to be missing some data about what really happened during the - * sample interval. The count produced by each thread includes an initial work item that may have - * started well before the start of the interval, and each thread may have been running some new - * work item for some time before the end of the interval, which did not yet get counted. So - * our count is going to be off by +/- threadCount workitems. - * - * The exception is that the thread that reported to us last time definitely wasn't running any work - * at that time, and the thread that's reporting now definitely isn't running a work item now. So - * we really only need to consider threadCount-1 threads. - * - * Thus the percent error in our count is +/- (threadCount-1)/numCompletions. - * - * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because - * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, - * then the next one likely will be too. The one after that will include the sum of the completions - * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have - * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency - * range we're targeting, which will not be filtered by the frequency-domain translation. */ - if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { - /* Not accurate enough yet. Let's accumulate the data so - * far, and tell the ThreadPool to collect a little more. */ - hc->accumulated_sample_duration = sample_duration; - hc->accumulated_completion_count = completions; - *adjustment_interval = 10; - return current_thread_count; - } - - /* We've got enouugh data for our sample; reset our accumulators for next time. */ - hc->accumulated_sample_duration = 0; - hc->accumulated_completion_count = 0; - - /* Add the current thread count and throughput sample to our history. */ - throughput = ((gdouble) completions) / sample_duration; - - sample_index = hc->total_samples % hc->samples_to_measure; - hc->samples [sample_index] = throughput; - hc->thread_counts [sample_index] = current_thread_count; - hc->total_samples ++; - - /* Set up defaults for our metrics. */ - thread_wave_component = mono_double_complex_make(0, 0); - throughput_wave_component = mono_double_complex_make(0, 0); - throughput_error_estimate = 0; - ratio = mono_double_complex_make(0, 0); - confidence = 0; - - transition = TRANSITION_WARMUP; - - /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also - * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between - * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */ - sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period; - - if (sample_count > hc->wave_period) { - guint i; - gdouble average_throughput; - gdouble average_thread_count; - gdouble sample_sum = 0; - gdouble thread_sum = 0; - - /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */ - for (i = 0; i < sample_count; ++i) { - guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure; - sample_sum += hc->samples [j]; - thread_sum += hc->thread_counts [j]; - } - - average_throughput = sample_sum / sample_count; - average_thread_count = thread_sum / sample_count; - - if (average_throughput > 0 && average_thread_count > 0) { - gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2; - - /* Calculate the periods of the adjacent frequency bands we'll be using to - * measure noise levels. We want the two adjacent Fourier frequency bands. */ - adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1); - adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1); - - /* Get the the three different frequency components of the throughput (scaled by average - * throughput). Our "error" estimate (the amount of noise that might be present in the - * frequency band we're really interested in) is the average of the adjacent bands. */ - throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput); - throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput)); - - if (adjacent_period_2 <= sample_count) { - throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component ( - hc->samples, sample_count, adjacent_period_2), average_throughput))); - } - - /* Do the same for the thread counts, so we have something to compare to. We don't - * measure thread count noise, because there is none; these are exact measurements. */ - thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count); - - /* Update our moving average of the throughput noise. We'll use this - * later as feedback to determine the new size of the thread wave. */ - if (hc->average_throughput_noise == 0) { - hc->average_throughput_noise = throughput_error_estimate; - } else { - hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate) - + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise); - } - - if (cabs (thread_wave_component) > 0) { - /* Adjust the throughput wave so it's centered around the target wave, - * and then calculate the adjusted throughput/thread ratio. */ - ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component); - transition = TRANSITION_CLIMBING_MOVE; - } else { - ratio = mono_double_complex_make (0, 0); - transition = TRANSITION_STABILIZING; - } - - noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate); - if (noise_for_confidence > 0) { - confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio; - } else { - /* there is no noise! */ - confidence = 1.0; - } - } - } - - /* We use just the real part of the complex ratio we just calculated. If the throughput signal - * is exactly in phase with the thread signal, this will be the same as taking the magnitude of - * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move - * backward (because this indicates that our changes are having the opposite of the intended effect). - * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're - * having a negative or positive effect on throughput. */ - move = creal (ratio); - move = CLAMP (move, -1.0, 1.0); - - /* Apply our confidence multiplier. */ - move *= CLAMP (confidence, -1.0, 1.0); - - /* Now apply non-linear gain, such that values around zero are attenuated, while higher values - * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly - * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */ - gain = hc->max_change_per_second * sample_duration; - move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain; - move = MIN (move, hc->max_change_per_sample); - - /* If the result was positive, and CPU is > 95%, refuse the move. */ - if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH) - move = 0.0; - - /* Apply the move to our control setting. */ - hc->current_control_setting += move; - - /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the - * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */ - new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise - * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); - new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); - - /* Make sure our control setting is within the ThreadPool's limits. */ - hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude); - - /* Calculate the new thread count (control setting + square wave). */ - new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); - - /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */ - new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max); - - if (new_thread_count != current_thread_count) - hill_climbing_change_thread_count (new_thread_count, transition); - - if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min) - *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0))); - else - *adjustment_interval = hc->current_sample_interval; - - return new_thread_count; -} - -static void -heuristic_notify_work_completed (void) -{ - g_assert (threadpool); - - InterlockedIncrement (&threadpool->heuristic_completions); - threadpool->heuristic_last_dequeue = mono_msec_ticks (); -} - -static gboolean -heuristic_should_adjust (void) -{ - g_assert (threadpool); - - if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) { - ThreadPoolCounter counter; - counter.as_gint64 = COUNTER_READ(); - if (counter._.working <= counter._.max_working) - return TRUE; - } - - return FALSE; -} - -static void -heuristic_adjust (void) -{ - g_assert (threadpool); - - if (mono_coop_mutex_trylock (&threadpool->heuristic_lock) == 0) { - gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0); - gint64 sample_end = mono_msec_ticks (); - gint64 sample_duration = sample_end - threadpool->heuristic_sample_start; - - if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) { - ThreadPoolCounter counter; - gint16 new_thread_count; - - counter.as_gint64 = COUNTER_READ (); - new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval); - - COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; }); - - if (new_thread_count > counter._.max_working) - worker_request (mono_domain_get ()); - - threadpool->heuristic_sample_start = sample_end; - threadpool->heuristic_last_adjustment = mono_msec_ticks (); - } - - mono_coop_mutex_unlock (&threadpool->heuristic_lock); - } -} - -void -mono_threadpool_ms_cleanup (void) -{ -#ifndef DISABLE_SOCKETS - mono_threadpool_ms_io_cleanup (); -#endif - mono_lazy_cleanup (&status, cleanup); -} - -MonoAsyncResult * -mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error) -{ - static MonoClass *async_call_klass = NULL; - MonoMethodMessage *message; - MonoAsyncResult *async_result; - MonoAsyncCall *async_call; - MonoDelegate *async_callback = NULL; - MonoObject *state = NULL; - - if (!async_call_klass) - async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); - - mono_lazy_initialize (&status, initialize); - - mono_error_init (error); - - message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error); - return_val_if_nok (error, NULL); - - async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error); - return_val_if_nok (error, NULL); - - MONO_OBJECT_SETREF (async_call, msg, message); - MONO_OBJECT_SETREF (async_call, state, state); - - if (async_callback) { - MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass)); - MONO_OBJECT_SETREF (async_call, cb_target, async_callback); - } - - async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error); - return_val_if_nok (error, NULL); - MONO_OBJECT_SETREF (async_result, async_delegate, target); - - mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result, error); - return_val_if_nok (error, NULL); - - return async_result; -} - -MonoObject * -mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error) -{ - MonoAsyncCall *ac; - - mono_error_init (error); - g_assert (exc); - g_assert (out_args); - - *exc = NULL; - *out_args = NULL; - - /* check if already finished */ - mono_monitor_enter ((MonoObject*) ares); - - if (ares->endinvoke_called) { - mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once"); - mono_monitor_exit ((MonoObject*) ares); - return NULL; - } - - ares->endinvoke_called = 1; - - /* wait until we are really finished */ - if (ares->completed) { - mono_monitor_exit ((MonoObject *) ares); - } else { - gpointer wait_event; - if (ares->handle) { - wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle); - } else { - wait_event = mono_w32event_create (TRUE, FALSE); - g_assert(wait_event); - MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error); - if (!is_ok (error)) { - CloseHandle (wait_event); - return NULL; - } - MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle); - } - mono_monitor_exit ((MonoObject*) ares); - MONO_ENTER_GC_SAFE; -#ifdef HOST_WIN32 - WaitForSingleObjectEx (wait_event, INFINITE, TRUE); -#else - mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE); -#endif - MONO_EXIT_GC_SAFE; - } - - ac = (MonoAsyncCall*) ares->object_data; - g_assert (ac); - - *exc = ac->msg->exc; /* FIXME: GC add write barrier */ - *out_args = ac->out_args; - return ac->res; -} - -gboolean -mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout) -{ - gint64 end; - ThreadPoolDomain *tpdomain; - gboolean ret; - - g_assert (domain); - g_assert (timeout >= -1); - - g_assert (mono_domain_is_unloading (domain)); - - if (timeout != -1) - end = mono_msec_ticks () + timeout; - -#ifndef DISABLE_SOCKETS - mono_threadpool_ms_io_remove_domain_jobs (domain); - if (timeout != -1) { - if (mono_msec_ticks () > end) - return FALSE; - } -#endif - - /* - * Wait for all threads which execute jobs in the domain to exit. - * The is_unloading () check in worker_request () ensures that - * no new jobs are added after we enter the lock below. - */ - mono_lazy_initialize (&status, initialize); - domains_lock (); - - tpdomain = tpdomain_get (domain, FALSE); - if (!tpdomain) { - domains_unlock (); - return TRUE; - } - - ret = TRUE; - - while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) { - if (timeout == -1) { - mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock); - } else { - gint64 now; - gint res; - - now = mono_msec_ticks(); - if (now > end) { - ret = FALSE; - break; - } - - res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now); - if (res != 0) { - ret = FALSE; - break; - } - } - } - - /* Remove from the list the worker threads look at */ - tpdomain_remove (tpdomain); - - domains_unlock (); - - mono_coop_cond_destroy (&tpdomain->cleanup_cond); - tpdomain_free (tpdomain); - - return ret; -} - -void -mono_threadpool_ms_suspend (void) -{ - if (threadpool) - threadpool->suspended = TRUE; -} - -void -mono_threadpool_ms_resume (void) -{ - if (threadpool) - threadpool->suspended = FALSE; -} - -void -ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - ThreadPoolCounter counter; - - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - counter.as_gint64 = COUNTER_READ (); - - *worker_threads = MAX (0, threadpool->limit_worker_max - counter._.active); - *completion_port_threads = threadpool->limit_io_max; -} - -void -ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - *worker_threads = threadpool->limit_worker_min; - *completion_port_threads = threadpool->limit_io_min; -} - -void -ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - *worker_threads = threadpool->limit_worker_max; - *completion_port_threads = threadpool->limit_io_max; -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads) -{ - mono_lazy_initialize (&status, initialize); - - if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max) - return FALSE; - if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max) - return FALSE; - - threadpool->limit_worker_min = worker_threads; - threadpool->limit_io_min = completion_port_threads; - - return TRUE; -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads) -{ - gint cpu_count = mono_cpu_count (); - - mono_lazy_initialize (&status, initialize); - - if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count) - return FALSE; - if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count) - return FALSE; - - threadpool->limit_worker_max = worker_threads; - threadpool->limit_io_max = completion_port_threads; - - return TRUE; -} - -void -ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking) -{ - if (enable_worker_tracking) { - // TODO implement some kind of switch to have the possibily to use it - *enable_worker_tracking = FALSE; - } - - mono_lazy_initialize (&status, initialize); -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) -{ - ThreadPoolCounter counter; - - if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ()) - return FALSE; - - heuristic_notify_work_completed (); - - if (heuristic_should_adjust ()) - heuristic_adjust (); - - counter.as_gint64 = COUNTER_READ (); - return counter._.working <= counter._.max_working; -} - -void -ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) -{ - heuristic_notify_work_completed (); - - if (heuristic_should_adjust ()) - heuristic_adjust (); -} - -void -ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working) -{ - // TODO - MonoError error; - mono_error_set_not_implemented (&error, ""); - mono_error_set_pending_exception (&error); -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) -{ - return worker_request (mono_domain_get ()); -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped) -{ - /* This copy the behavior of the current Mono implementation */ - MonoError error; - mono_error_set_not_implemented (&error, ""); - mono_error_set_pending_exception (&error); - return FALSE; -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle) -{ - /* This copy the behavior of the current Mono implementation */ - return TRUE; -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void) -{ - return FALSE; -} diff --git a/mono/metadata/threadpool-worker-default.c b/mono/metadata/threadpool-worker-default.c new file mode 100644 index 00000000000..3e39c57c756 --- /dev/null +++ b/mono/metadata/threadpool-worker-default.c @@ -0,0 +1,1267 @@ +/* + * threadpool-worker.c: native threadpool worker + * + * Author: + * Ludovic Henry (ludovic.henry@xamarin.com) + * + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +#include +#define _USE_MATH_DEFINES // needed by MSVC to define math constants +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CPU_USAGE_LOW 80 +#define CPU_USAGE_HIGH 95 + +#define MONITOR_INTERVAL 500 // ms +#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms + +#define WORKER_CREATION_MAX_PER_SEC 10 + +/* The exponent to apply to the gain. 1.0 means to use linear gain, + * higher values will enhance large moves and damp small ones. + * default: 2.0 */ +#define HILL_CLIMBING_GAIN_EXPONENT 2.0 + +/* The 'cost' of a thread. 0 means drive for increased throughput regardless + * of thread count, higher values bias more against higher thread counts. + * default: 0.15 */ +#define HILL_CLIMBING_BIAS 0.15 + +#define HILL_CLIMBING_WAVE_PERIOD 4 +#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20 +#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0 +#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8 +#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0 +#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4 +#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20 +#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10 +#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200 +#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01 +#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15 + +typedef enum { + TRANSITION_WARMUP, + TRANSITION_INITIALIZING, + TRANSITION_RANDOM_MOVE, + TRANSITION_CLIMBING_MOVE, + TRANSITION_CHANGE_POINT, + TRANSITION_STABILIZING, + TRANSITION_STARVATION, + TRANSITION_THREAD_TIMED_OUT, + TRANSITION_UNDEFINED, +} ThreadPoolHeuristicStateTransition; + +typedef struct { + gint32 wave_period; + gint32 samples_to_measure; + gdouble target_throughput_ratio; + gdouble target_signal_to_noise_ratio; + gdouble max_change_per_second; + gdouble max_change_per_sample; + gint32 max_thread_wave_magnitude; + gint32 sample_interval_low; + gdouble thread_magnitude_multiplier; + gint32 sample_interval_high; + gdouble throughput_error_smoothing_factor; + gdouble gain_exponent; + gdouble max_sample_error; + + gdouble current_control_setting; + gint64 total_samples; + gint16 last_thread_count; + gdouble elapsed_since_last_change; + gdouble completions_since_last_change; + + gdouble average_throughput_noise; + + gdouble *samples; + gdouble *thread_counts; + + guint32 current_sample_interval; + gpointer random_interval_generator; + + gint32 accumulated_completion_count; + gdouble accumulated_sample_duration; +} ThreadPoolHillClimbing; + +typedef struct { + MonoThreadPoolWorkerCallback callback; + gpointer data; +} ThreadPoolWorkItem; + +typedef union { + struct { + gint16 max_working; /* determined by heuristic */ + gint16 starting; /* starting, but not yet in worker_thread */ + gint16 working; /* executing worker_thread */ + gint16 parked; /* parked */ + } _; + gint64 as_gint64; +} ThreadPoolWorkerCounter; + +typedef MonoInternalThread ThreadPoolWorkerThread; + +struct MonoThreadPoolWorker { + MonoRefCount ref; + + ThreadPoolWorkerCounter counters; + + GPtrArray *threads; // ThreadPoolWorkerThread* [] + MonoCoopMutex threads_lock; /* protect access to working_threads and parked_threads */ + gint32 parked_threads_count; + MonoCoopCond parked_threads_cond; + MonoCoopCond threads_exit_cond; + + ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem [] + gint32 work_items_count; + gint32 work_items_size; + MonoCoopMutex work_items_lock; + + guint32 worker_creation_current_second; + guint32 worker_creation_current_count; + MonoCoopMutex worker_creation_lock; + + gint32 heuristic_completions; + gint64 heuristic_sample_start; + gint64 heuristic_last_dequeue; // ms + gint64 heuristic_last_adjustment; // ms + gint64 heuristic_adjustment_interval; // ms + ThreadPoolHillClimbing heuristic_hill_climbing; + MonoCoopMutex heuristic_lock; + + gint32 limit_worker_min; + gint32 limit_worker_max; + + MonoCpuUsageState *cpu_usage_state; + gint32 cpu_usage; + + /* suspended by the debugger */ + gboolean suspended; + + gint32 monitor_status; +}; + +enum { + MONITOR_STATUS_REQUESTED, + MONITOR_STATUS_WAITING_FOR_REQUEST, + MONITOR_STATUS_NOT_RUNNING, +}; + +#define COUNTER_CHECK(counter) \ + do { \ + g_assert (counter._.max_working > 0); \ + g_assert (counter._.starting >= 0); \ + g_assert (counter._.working >= 0); \ + } while (0) + +#define COUNTER_ATOMIC(worker,var,block) \ + do { \ + ThreadPoolWorkerCounter __old; \ + do { \ + g_assert (worker); \ + __old = COUNTER_READ (worker); \ + (var) = __old; \ + { block; } \ + COUNTER_CHECK (var); \ + } while (InterlockedCompareExchange64 (&worker->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ + } while (0) + +static inline ThreadPoolWorkerCounter +COUNTER_READ (MonoThreadPoolWorker *worker) +{ + ThreadPoolWorkerCounter counter; + counter.as_gint64 = InterlockedRead64 (&worker->counters.as_gint64); + return counter; +} + +static gpointer +rand_create (void) +{ + mono_rand_open (); + return mono_rand_init (NULL, 0); +} + +static guint32 +rand_next (gpointer *handle, guint32 min, guint32 max) +{ + MonoError error; + guint32 val; + mono_rand_try_get_uint32 (handle, &val, min, max, &error); + // FIXME handle error + mono_error_assert_ok (&error); + return val; +} + +static void +destroy (gpointer data) +{ + MonoThreadPoolWorker *worker; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + // FIXME destroy everything + + g_free (worker); +} + +void +mono_threadpool_worker_init (MonoThreadPoolWorker **worker) +{ + MonoThreadPoolWorker *wk; + ThreadPoolHillClimbing *hc; + const char *threads_per_cpu_env; + gint threads_per_cpu; + gint threads_count; + + g_assert (worker); + + wk = *worker = g_new0 (MonoThreadPoolWorker, 1); + + mono_refcount_init (wk, destroy); + + wk->threads = g_ptr_array_new (); + mono_coop_mutex_init (&wk->threads_lock); + wk->parked_threads_count = 0; + mono_coop_cond_init (&wk->parked_threads_cond); + mono_coop_cond_init (&wk->threads_exit_cond); + + /* wk->work_items_size is inited to 0 */ + mono_coop_mutex_init (&wk->work_items_lock); + + wk->worker_creation_current_second = -1; + mono_coop_mutex_init (&wk->worker_creation_lock); + + wk->heuristic_adjustment_interval = 10; + mono_coop_mutex_init (&wk->heuristic_lock); + + mono_rand_open (); + + hc = &wk->heuristic_hill_climbing; + + hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; + hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; + hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER; + hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE; + hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS; + hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO; + hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND; + hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE; + hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW; + hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH; + hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR; + hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT; + hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT; + hc->current_control_setting = 0; + hc->total_samples = 0; + hc->last_thread_count = 0; + hc->average_throughput_noise = 0; + hc->elapsed_since_last_change = 0; + hc->accumulated_completion_count = 0; + hc->accumulated_sample_duration = 0; + hc->samples = g_new0 (gdouble, hc->samples_to_measure); + hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure); + hc->random_interval_generator = rand_create (); + hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); + + if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU"))) + threads_per_cpu = 1; + else + threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50); + + threads_count = mono_cpu_count () * threads_per_cpu; + + wk->limit_worker_min = threads_count; + +#if defined (PLATFORM_ANDROID) || defined (HOST_IOS) + wk->limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); +#else + wk->limit_worker_max = threads_count * 100; +#endif + + wk->counters._.max_working = wk->limit_worker_min; + + wk->cpu_usage_state = g_new0 (MonoCpuUsageState, 1); + + wk->suspended = FALSE; + + wk->monitor_status = MONITOR_STATUS_NOT_RUNNING; +} + +void +mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker) +{ + MonoInternalThread *current; + + /* we make the assumption along the code that we are + * cleaning up only if the runtime is shutting down */ + g_assert (mono_runtime_is_shutting_down ()); + + current = mono_thread_internal_current (); + + while (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING) + mono_thread_info_sleep (1, NULL); + + mono_coop_mutex_lock (&worker->threads_lock); + + /* unpark all worker->parked_threads */ + mono_coop_cond_broadcast (&worker->parked_threads_cond); + + for (;;) { + ThreadPoolWorkerCounter counter; + + counter = COUNTER_READ (worker); + if (counter._.starting + counter._.working + counter._.parked == 0) + break; + + if (counter._.starting + counter._.working + counter._.parked == 1) { + if (worker->threads->len == 1 && g_ptr_array_index (worker->threads, 0) == current) { + /* We are waiting on ourselves */ + break; + } + } + + mono_coop_cond_wait (&worker->threads_exit_cond, &worker->threads_lock); + } + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_refcount_dec (worker); +} + +static void +work_item_lock (MonoThreadPoolWorker *worker) +{ + mono_coop_mutex_lock (&worker->work_items_lock); +} + +static void +work_item_unlock (MonoThreadPoolWorker *worker) +{ + mono_coop_mutex_unlock (&worker->work_items_lock); +} + +static void +work_item_push (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +{ + ThreadPoolWorkItem work_item; + + g_assert (worker); + g_assert (callback); + + work_item.callback = callback; + work_item.data = data; + + work_item_lock (worker); + + g_assert (worker->work_items_count <= worker->work_items_size); + + if (G_UNLIKELY (worker->work_items_count == worker->work_items_size)) { + worker->work_items_size += 64; + worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + } + + g_assert (worker->work_items); + + worker->work_items [worker->work_items_count ++] = work_item; + + // printf ("[push] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", + // worker->work_items, worker->work_items_count, worker->work_items_size); + + work_item_unlock (worker); +} + +static gboolean +work_item_try_pop (MonoThreadPoolWorker *worker, ThreadPoolWorkItem *work_item) +{ + g_assert (worker); + g_assert (work_item); + + work_item_lock (worker); + + // printf ("[pop] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", + // worker->work_items, worker->work_items_count, worker->work_items_size); + + if (worker->work_items_count == 0) { + work_item_unlock (worker); + return FALSE; + } + + *work_item = worker->work_items [-- worker->work_items_count]; + + if (G_UNLIKELY (worker->work_items_count >= 64 * 3 && worker->work_items_count < worker->work_items_size / 2)) { + worker->work_items_size -= 64; + worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + } + + work_item_unlock (worker); + + return TRUE; +} + +static gint32 +work_item_count (MonoThreadPoolWorker *worker) +{ + gint32 count; + + work_item_lock (worker); + count = worker->work_items_count; + work_item_unlock (worker); + + return count; +} + +static void worker_request (MonoThreadPoolWorker *worker); + +void +mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +{ + work_item_push (worker, callback, data); + + worker_request (worker); +} + +static void +worker_wait_interrupt (gpointer data) +{ + MonoThreadPoolWorker *worker; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + mono_coop_mutex_lock (&worker->threads_lock); + mono_coop_cond_signal (&worker->parked_threads_cond); + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_refcount_dec (worker); +} + +/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */ +static gboolean +worker_park (MonoThreadPoolWorker *worker) +{ + gboolean timeout = FALSE; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ()); + + mono_coop_mutex_lock (&worker->threads_lock); + + if (!mono_runtime_is_shutting_down ()) { + static gpointer rand_handle = NULL; + MonoInternalThread *thread; + gboolean interrupted = FALSE; + ThreadPoolWorkerCounter counter; + + if (!rand_handle) + rand_handle = rand_create (); + g_assert (rand_handle); + + thread = mono_thread_internal_current (); + g_assert (thread); + + COUNTER_ATOMIC (worker, counter, { + counter._.working --; + counter._.parked ++; + }); + + worker->parked_threads_count += 1; + + mono_thread_info_install_interrupt (worker_wait_interrupt, mono_refcount_inc (worker), &interrupted); + if (interrupted) { + mono_refcount_dec (worker); + goto done; + } + + if (mono_coop_cond_timedwait (&worker->parked_threads_cond, &worker->threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) + timeout = TRUE; + + mono_thread_info_uninstall_interrupt (&interrupted); + if (!interrupted) + mono_refcount_dec (worker); + +done: + worker->parked_threads_count -= 1; + + COUNTER_ATOMIC (worker, counter, { + counter._.working ++; + counter._.parked --; + }); + } + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no"); + + return timeout; +} + +static gboolean +worker_try_unpark (MonoThreadPoolWorker *worker) +{ + gboolean res = FALSE; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); + + mono_coop_mutex_lock (&worker->threads_lock); + if (worker->parked_threads_count > 0) { + mono_coop_cond_signal (&worker->parked_threads_cond); + res = TRUE; + } + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); + + return res; +} + +static void +worker_thread (gpointer data) +{ + MonoThreadPoolWorker *worker; + MonoError error; + MonoInternalThread *thread; + ThreadPoolWorkerCounter counter; + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + COUNTER_ATOMIC (worker, counter, { + counter._.starting --; + counter._.working ++; + }); + + thread = mono_thread_internal_current (); + g_assert (thread); + + mono_coop_mutex_lock (&worker->threads_lock); + g_ptr_array_add (worker->threads, thread); + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error); + mono_error_assert_ok (&error); + + while (!mono_runtime_is_shutting_down ()) { + ThreadPoolWorkItem work_item; + + if (mono_thread_interruption_checkpoint ()) + continue; + + if (!work_item_try_pop (worker, &work_item)) { + gboolean timeout; + + timeout = worker_park (worker); + if (timeout) + break; + + continue; + } + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing %p (%p)", + mono_native_thread_id_get (), work_item.callback, work_item.data); + + work_item.callback (work_item.data); + } + + mono_coop_mutex_lock (&worker->threads_lock); + + COUNTER_ATOMIC (worker, counter, { + counter._.working --; + }); + + g_ptr_array_remove (worker->threads, thread); + + mono_coop_cond_signal (&worker->threads_exit_cond); + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); + + mono_refcount_dec (worker); +} + +static gboolean +worker_try_create (MonoThreadPoolWorker *worker) +{ + MonoError error; + MonoInternalThread *thread; + gint64 current_ticks; + gint32 now; + ThreadPoolWorkerCounter counter; + + if (mono_runtime_is_shutting_down ()) + return FALSE; + + mono_coop_mutex_lock (&worker->worker_creation_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); + + current_ticks = mono_100ns_ticks (); + if (0 == current_ticks) { + g_warning ("failed to get 100ns ticks"); + } else { + now = current_ticks / (10 * 1000 * 1000); + if (worker->worker_creation_current_second != now) { + worker->worker_creation_current_second = now; + worker->worker_creation_current_count = 0; + } else { + g_assert (worker->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); + if (worker->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", + mono_native_thread_id_get (), worker->worker_creation_current_count); + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return FALSE; + } + } + } + + COUNTER_ATOMIC (worker, counter, { + if (counter._.working >= counter._.max_working) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", + mono_native_thread_id_get ()); + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return FALSE; + } + counter._.starting ++; + }); + + thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, mono_refcount_inc (worker), TRUE, 0, &error); + if (!thread) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); + mono_error_cleanup (&error); + + COUNTER_ATOMIC (worker, counter, { + counter._.starting --; + }); + + mono_coop_mutex_unlock (&worker->worker_creation_lock); + + mono_refcount_dec (worker); + + return FALSE; + } + + worker->worker_creation_current_count += 1; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", + mono_native_thread_id_get (), (gpointer) thread->tid, now, worker->worker_creation_current_count); + + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return TRUE; +} + +static void monitor_ensure_running (MonoThreadPoolWorker *worker); + +static void +worker_request (MonoThreadPoolWorker *worker) +{ + g_assert (worker); + + if (worker->suspended) + return; + + monitor_ensure_running (worker); + + if (worker_try_unpark (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); + return; + } + + if (worker_try_create (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); + return; + } + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); +} + +static gboolean +monitor_should_keep_running (MonoThreadPoolWorker *worker) +{ + static gint64 last_should_keep_running = -1; + + g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + + if (InterlockedExchange (&worker->monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { + gboolean should_keep_running = TRUE, force_should_keep_running = FALSE; + + if (mono_runtime_is_shutting_down ()) { + should_keep_running = FALSE; + } else { + if (work_item_count (worker) == 0) + should_keep_running = FALSE; + + if (!should_keep_running) { + if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) { + should_keep_running = force_should_keep_running = TRUE; + } + } + } + + if (should_keep_running) { + if (last_should_keep_running == -1 || !force_should_keep_running) + last_should_keep_running = mono_100ns_ticks (); + } else { + last_should_keep_running = -1; + if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) + return FALSE; + } + } + + g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + + return TRUE; +} + +static gboolean +monitor_sufficient_delay_since_last_dequeue (MonoThreadPoolWorker *worker) +{ + gint64 threshold; + + g_assert (worker); + + if (worker->cpu_usage < CPU_USAGE_LOW) { + threshold = MONITOR_INTERVAL; + } else { + ThreadPoolWorkerCounter counter; + counter = COUNTER_READ (worker); + threshold = counter._.max_working * MONITOR_INTERVAL * 2; + } + + return mono_msec_ticks () >= worker->heuristic_last_dequeue + threshold; +} + +static void hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); + +static void +monitor_thread (gpointer data) +{ + MonoThreadPoolWorker *worker; + MonoInternalThread *internal; + guint i; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + internal = mono_thread_internal_current (); + g_assert (internal); + + mono_cpu_usage (worker->cpu_usage_state); + + // printf ("monitor_thread: start\n"); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ()); + + do { + ThreadPoolWorkerCounter counter; + gboolean limit_worker_max_reached; + gint32 interval_left = MONITOR_INTERVAL; + gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */ + + g_assert (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING); + + // counter = COUNTER_READ (worker); + // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n", + // counter._.starting, counter._.working, counter._.parked, counter._.max_working); + + do { + gint64 ts; + gboolean alerted = FALSE; + + if (mono_runtime_is_shutting_down ()) + break; + + ts = mono_msec_ticks (); + if (mono_thread_info_sleep (interval_left, &alerted) == 0) + break; + interval_left -= mono_msec_ticks () - ts; + + g_assert (!(internal->state & ThreadState_StopRequested)); + mono_thread_interruption_checkpoint (); + } while (interval_left > 0 && ++awake < 10); + + if (mono_runtime_is_shutting_down ()) + continue; + + if (worker->suspended) + continue; + + if (work_item_count (worker) == 0) + continue; + + worker->cpu_usage = mono_cpu_usage (worker->cpu_usage_state); + + if (!monitor_sufficient_delay_since_last_dequeue (worker)) + continue; + + limit_worker_max_reached = FALSE; + + COUNTER_ATOMIC (worker, counter, { + if (counter._.max_working >= worker->limit_worker_max) { + limit_worker_max_reached = TRUE; + break; + } + counter._.max_working ++; + }); + + if (limit_worker_max_reached) + continue; + + hill_climbing_force_change (worker, counter._.max_working, TRANSITION_STARVATION); + + for (i = 0; i < 5; ++i) { + if (mono_runtime_is_shutting_down ()) + break; + + if (worker_try_unpark (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); + break; + } + + if (worker_try_create (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); + break; + } + } + } while (monitor_should_keep_running (worker)); + + // printf ("monitor_thread: stop\n"); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ()); +} + +static void +monitor_ensure_running (MonoThreadPoolWorker *worker) +{ + MonoError error; + for (;;) { + switch (worker->monitor_status) { + case MONITOR_STATUS_REQUESTED: + // printf ("monitor_thread: requested\n"); + return; + case MONITOR_STATUS_WAITING_FOR_REQUEST: + // printf ("monitor_thread: waiting for request\n"); + InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); + break; + case MONITOR_STATUS_NOT_RUNNING: + // printf ("monitor_thread: not running\n"); + if (mono_runtime_is_shutting_down ()) + return; + if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { + // printf ("monitor_thread: creating\n"); + if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, worker, TRUE, SMALL_STACK, &error)) { + // printf ("monitor_thread: creating failed\n"); + worker->monitor_status = MONITOR_STATUS_NOT_RUNNING; + mono_error_cleanup (&error); + } + return; + } + break; + default: g_assert_not_reached (); + } + } +} + +static void +hill_climbing_change_thread_count (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +{ + ThreadPoolHillClimbing *hc; + + g_assert (worker); + + hc = &worker->heuristic_hill_climbing; + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); + + hc->last_thread_count = new_thread_count; + hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); + hc->elapsed_since_last_change = 0; + hc->completions_since_last_change = 0; +} + +static void +hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +{ + ThreadPoolHillClimbing *hc; + + g_assert (worker); + + hc = &worker->heuristic_hill_climbing; + + if (new_thread_count != hc->last_thread_count) { + hc->current_control_setting += new_thread_count - hc->last_thread_count; + hill_climbing_change_thread_count (worker, new_thread_count, transition); + } +} + +static double_complex +hill_climbing_get_wave_component (MonoThreadPoolWorker *worker, gdouble *samples, guint sample_count, gdouble period) +{ + ThreadPoolHillClimbing *hc; + gdouble w, cosine, sine, coeff, q0, q1, q2; + guint i; + + g_assert (worker); + g_assert (sample_count >= period); + g_assert (period >= 2); + + hc = &worker->heuristic_hill_climbing; + + w = 2.0 * M_PI / period; + cosine = cos (w); + sine = sin (w); + coeff = 2.0 * cosine; + q0 = q1 = q2 = 0; + + for (i = 0; i < sample_count; ++i) { + q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure]; + q2 = q1; + q1 = q0; + } + + return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count)); +} + +static gint16 +hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) +{ + ThreadPoolHillClimbing *hc; + ThreadPoolHeuristicStateTransition transition; + gdouble throughput; + gdouble throughput_error_estimate; + gdouble confidence; + gdouble move; + gdouble gain; + gint sample_index; + gint sample_count; + gint new_thread_wave_magnitude; + gint new_thread_count; + double_complex thread_wave_component; + double_complex throughput_wave_component; + double_complex ratio; + + g_assert (worker); + g_assert (adjustment_interval); + + hc = &worker->heuristic_hill_climbing; + + /* If someone changed the thread count without telling us, update our records accordingly. */ + if (current_thread_count != hc->last_thread_count) + hill_climbing_force_change (worker, current_thread_count, TRANSITION_INITIALIZING); + + /* Update the cumulative stats for this thread count */ + hc->elapsed_since_last_change += sample_duration; + hc->completions_since_last_change += completions; + + /* Add in any data we've already collected about this sample */ + sample_duration += hc->accumulated_sample_duration; + completions += hc->accumulated_completion_count; + + /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end + * of each work item, we are goinng to be missing some data about what really happened during the + * sample interval. The count produced by each thread includes an initial work item that may have + * started well before the start of the interval, and each thread may have been running some new + * work item for some time before the end of the interval, which did not yet get counted. So + * our count is going to be off by +/- threadCount workitems. + * + * The exception is that the thread that reported to us last time definitely wasn't running any work + * at that time, and the thread that's reporting now definitely isn't running a work item now. So + * we really only need to consider threadCount-1 threads. + * + * Thus the percent error in our count is +/- (threadCount-1)/numCompletions. + * + * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because + * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, + * then the next one likely will be too. The one after that will include the sum of the completions + * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have + * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency + * range we're targeting, which will not be filtered by the frequency-domain translation. */ + if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { + /* Not accurate enough yet. Let's accumulate the data so + * far, and tell the MonoThreadPoolWorker to collect a little more. */ + hc->accumulated_sample_duration = sample_duration; + hc->accumulated_completion_count = completions; + *adjustment_interval = 10; + return current_thread_count; + } + + /* We've got enouugh data for our sample; reset our accumulators for next time. */ + hc->accumulated_sample_duration = 0; + hc->accumulated_completion_count = 0; + + /* Add the current thread count and throughput sample to our history. */ + throughput = ((gdouble) completions) / sample_duration; + + sample_index = hc->total_samples % hc->samples_to_measure; + hc->samples [sample_index] = throughput; + hc->thread_counts [sample_index] = current_thread_count; + hc->total_samples ++; + + /* Set up defaults for our metrics. */ + thread_wave_component = mono_double_complex_make(0, 0); + throughput_wave_component = mono_double_complex_make(0, 0); + throughput_error_estimate = 0; + ratio = mono_double_complex_make(0, 0); + confidence = 0; + + transition = TRANSITION_WARMUP; + + /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also + * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between + * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */ + sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period; + + if (sample_count > hc->wave_period) { + guint i; + gdouble average_throughput; + gdouble average_thread_count; + gdouble sample_sum = 0; + gdouble thread_sum = 0; + + /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */ + for (i = 0; i < sample_count; ++i) { + guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure; + sample_sum += hc->samples [j]; + thread_sum += hc->thread_counts [j]; + } + + average_throughput = sample_sum / sample_count; + average_thread_count = thread_sum / sample_count; + + if (average_throughput > 0 && average_thread_count > 0) { + gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2; + + /* Calculate the periods of the adjacent frequency bands we'll be using to + * measure noise levels. We want the two adjacent Fourier frequency bands. */ + adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1); + adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1); + + /* Get the the three different frequency components of the throughput (scaled by average + * throughput). Our "error" estimate (the amount of noise that might be present in the + * frequency band we're really interested in) is the average of the adjacent bands. */ + throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, hc->wave_period), average_throughput); + throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, adjacent_period_1), average_throughput)); + + if (adjacent_period_2 <= sample_count) { + throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component ( + worker, hc->samples, sample_count, adjacent_period_2), average_throughput))); + } + + /* Do the same for the thread counts, so we have something to compare to. We don't + * measure thread count noise, because there is none; these are exact measurements. */ + thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->thread_counts, sample_count, hc->wave_period), average_thread_count); + + /* Update our moving average of the throughput noise. We'll use this + * later as feedback to determine the new size of the thread wave. */ + if (hc->average_throughput_noise == 0) { + hc->average_throughput_noise = throughput_error_estimate; + } else { + hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate) + + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise); + } + + if (cabs (thread_wave_component) > 0) { + /* Adjust the throughput wave so it's centered around the target wave, + * and then calculate the adjusted throughput/thread ratio. */ + ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component); + transition = TRANSITION_CLIMBING_MOVE; + } else { + ratio = mono_double_complex_make (0, 0); + transition = TRANSITION_STABILIZING; + } + + noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate); + if (noise_for_confidence > 0) { + confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio; + } else { + /* there is no noise! */ + confidence = 1.0; + } + } + } + + /* We use just the real part of the complex ratio we just calculated. If the throughput signal + * is exactly in phase with the thread signal, this will be the same as taking the magnitude of + * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move + * backward (because this indicates that our changes are having the opposite of the intended effect). + * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're + * having a negative or positive effect on throughput. */ + move = creal (ratio); + move = CLAMP (move, -1.0, 1.0); + + /* Apply our confidence multiplier. */ + move *= CLAMP (confidence, -1.0, 1.0); + + /* Now apply non-linear gain, such that values around zero are attenuated, while higher values + * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly + * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */ + gain = hc->max_change_per_second * sample_duration; + move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain; + move = MIN (move, hc->max_change_per_sample); + + /* If the result was positive, and CPU is > 95%, refuse the move. */ + if (move > 0.0 && worker->cpu_usage > CPU_USAGE_HIGH) + move = 0.0; + + /* Apply the move to our control setting. */ + hc->current_control_setting += move; + + /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the + * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */ + new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise + * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); + new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); + + /* Make sure our control setting is within the MonoThreadPoolWorker's limits. */ + hc->current_control_setting = CLAMP (hc->current_control_setting, worker->limit_worker_min, worker->limit_worker_max - new_thread_wave_magnitude); + + /* Calculate the new thread count (control setting + square wave). */ + new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); + + /* Make sure the new thread count doesn't exceed the MonoThreadPoolWorker's limits. */ + new_thread_count = CLAMP (new_thread_count, worker->limit_worker_min, worker->limit_worker_max); + + if (new_thread_count != current_thread_count) + hill_climbing_change_thread_count (worker, new_thread_count, transition); + + if (creal (ratio) < 0.0 && new_thread_count == worker->limit_worker_min) + *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0))); + else + *adjustment_interval = hc->current_sample_interval; + + return new_thread_count; +} + +static gboolean +heuristic_should_adjust (MonoThreadPoolWorker *worker) +{ + if (worker->heuristic_last_dequeue > worker->heuristic_last_adjustment + worker->heuristic_adjustment_interval) { + ThreadPoolWorkerCounter counter; + counter = COUNTER_READ (worker); + if (counter._.working <= counter._.max_working) + return TRUE; + } + + return FALSE; +} + +static void +heuristic_adjust (MonoThreadPoolWorker *worker) +{ + if (mono_coop_mutex_trylock (&worker->heuristic_lock) == 0) { + gint32 completions = InterlockedExchange (&worker->heuristic_completions, 0); + gint64 sample_end = mono_msec_ticks (); + gint64 sample_duration = sample_end - worker->heuristic_sample_start; + + if (sample_duration >= worker->heuristic_adjustment_interval / 2) { + ThreadPoolWorkerCounter counter; + gint16 new_thread_count; + + counter = COUNTER_READ (worker); + new_thread_count = hill_climbing_update (worker, counter._.max_working, sample_duration, completions, &worker->heuristic_adjustment_interval); + + COUNTER_ATOMIC (worker, counter, { + counter._.max_working = new_thread_count; + }); + + if (new_thread_count > counter._.max_working) + worker_request (worker); + + worker->heuristic_sample_start = sample_end; + worker->heuristic_last_adjustment = mono_msec_ticks (); + } + + mono_coop_mutex_unlock (&worker->heuristic_lock); + } +} + +static void +heuristic_notify_work_completed (MonoThreadPoolWorker *worker) +{ + g_assert (worker); + + InterlockedIncrement (&worker->heuristic_completions); + worker->heuristic_last_dequeue = mono_msec_ticks (); + + if (heuristic_should_adjust (worker)) + heuristic_adjust (worker); +} + +gboolean +mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker) +{ + ThreadPoolWorkerCounter counter; + + heuristic_notify_work_completed (worker); + + counter = COUNTER_READ (worker); + return counter._.working <= counter._.max_working; +} + +gint32 +mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker) +{ + return worker->limit_worker_min; +} + +gboolean +mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value) +{ + if (value <= 0 || value > worker->limit_worker_max) + return FALSE; + + worker->limit_worker_min = value; + return TRUE; +} + +gint32 +mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker) +{ + return worker->limit_worker_max; +} + +gboolean +mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value) +{ + gint32 cpu_count = mono_cpu_count (); + + if (value < worker->limit_worker_min || value < cpu_count) + return FALSE; + + worker->limit_worker_max = value; + return TRUE; +} + +void +mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended) +{ + worker->suspended = suspended; + if (!suspended) + worker_request (worker); +} diff --git a/mono/metadata/threadpool-worker.h b/mono/metadata/threadpool-worker.h new file mode 100644 index 00000000000..b63df60c915 --- /dev/null +++ b/mono/metadata/threadpool-worker.h @@ -0,0 +1,34 @@ + +#ifndef _MONO_METADATA_THREADPOOL_WORKER_H +#define _MONO_METADATA_THREADPOOL_WORKER_H + +typedef struct MonoThreadPoolWorker MonoThreadPoolWorker; + +typedef void (*MonoThreadPoolWorkerCallback)(gpointer); + +void +mono_threadpool_worker_init (MonoThreadPoolWorker **worker); + +void +mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker); + +void +mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data); + +gboolean +mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker); + +gint32 +mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker); +gboolean +mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value); + +gint32 +mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker); +gboolean +mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value); + +void +mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended); + +#endif /* _MONO_METADATA_THREADPOOL_WORKER_H */ diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c new file mode 100644 index 00000000000..ab88e67f4b9 --- /dev/null +++ b/mono/metadata/threadpool.c @@ -0,0 +1,835 @@ +/* + * threadpool.c: Microsoft threadpool runtime support + * + * Author: + * Ludovic Henry (ludovic.henry@xamarin.com) + * + * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// +// Files: +// - src/vm/comthreadpool.cpp +// - src/vm/win32threadpoolcpp +// - src/vm/threadpoolrequest.cpp +// - src/vm/hillclimbing.cpp +// +// Ported from C++ to C and adjusted to Mono runtime + +#include +#define _USE_MATH_DEFINES // needed by MSVC to define math constants +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct { + MonoDomain *domain; + /* Number of outstanding jobs */ + gint32 outstanding_request; + /* Number of currently executing jobs */ + gint32 threadpool_jobs; + /* Signalled when threadpool_jobs + outstanding_request is 0 */ + /* Protected by threadpool->domains_lock */ + MonoCoopCond cleanup_cond; +} ThreadPoolDomain; + +typedef union { + struct { + gint16 starting; /* starting, but not yet in worker_callback */ + gint16 working; /* executing worker_callback */ + } _; + gint32 as_gint32; +} ThreadPoolCounter; + +typedef struct { + MonoRefCount ref; + + GPtrArray *domains; // ThreadPoolDomain* [] + MonoCoopMutex domains_lock; + + GPtrArray *threads; // MonoInternalThread* [] + MonoCoopMutex threads_lock; + MonoCoopCond threads_exit_cond; + + ThreadPoolCounter counters; + + gint32 limit_io_min; + gint32 limit_io_max; + + MonoThreadPoolWorker *worker; +} ThreadPool; + +static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; + +static ThreadPool* threadpool; + +#define COUNTER_CHECK(counter) \ + do { \ + g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32)); \ + g_assert (counter._.starting >= 0); \ + g_assert (counter._.working >= 0); \ + } while (0) + +#define COUNTER_ATOMIC(threadpool,var,block) \ + do { \ + ThreadPoolCounter __old; \ + do { \ + g_assert (threadpool); \ + __old = COUNTER_READ (threadpool); \ + (var) = __old; \ + { block; } \ + COUNTER_CHECK (var); \ + } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \ + } while (0) + +static inline ThreadPoolCounter +COUNTER_READ (ThreadPool *threadpool) +{ + ThreadPoolCounter counter; + counter.as_gint32 = InterlockedRead (&threadpool->counters.as_gint32); + return counter; +} + +static inline void +domains_lock (void) +{ + mono_coop_mutex_lock (&threadpool->domains_lock); +} + +static inline void +domains_unlock (void) +{ + mono_coop_mutex_unlock (&threadpool->domains_lock); +} + +static void +destroy (gpointer unused) +{ + g_ptr_array_free (threadpool->domains, TRUE); + mono_coop_mutex_destroy (&threadpool->domains_lock); + + g_ptr_array_free (threadpool->threads, TRUE); + mono_coop_mutex_destroy (&threadpool->threads_lock); + mono_coop_cond_destroy (&threadpool->threads_exit_cond); + + g_free (threadpool); +} + +static void +initialize (void) +{ + g_assert (!threadpool); + threadpool = g_new0 (ThreadPool, 1); + g_assert (threadpool); + + mono_refcount_init (threadpool, destroy); + + threadpool->domains = g_ptr_array_new (); + mono_coop_mutex_init (&threadpool->domains_lock); + + threadpool->threads = g_ptr_array_new (); + mono_coop_mutex_init (&threadpool->threads_lock); + mono_coop_cond_init (&threadpool->threads_exit_cond); + + threadpool->limit_io_min = mono_cpu_count (); + threadpool->limit_io_max = CLAMP (threadpool->limit_io_min * 100, MIN (threadpool->limit_io_min, 200), MAX (threadpool->limit_io_min, 200)); + + mono_threadpool_worker_init (&threadpool->worker); +} + +static void +cleanup (void) +{ + guint i; + MonoInternalThread *current; + + /* we make the assumption along the code that we are + * cleaning up only if the runtime is shutting down */ + g_assert (mono_runtime_is_shutting_down ()); + + current = mono_thread_internal_current (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + /* stop all threadpool->threads */ + for (i = 0; i < threadpool->threads->len; ++i) { + MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool->threads, i); + if (thread != current) + mono_thread_internal_abort (thread); + } + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + /* give a chance to the other threads to exit */ + mono_thread_info_yield (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + for (;;) { + ThreadPoolCounter counter; + + counter = COUNTER_READ (threadpool); + if (counter._.working == 0) + break; + + if (counter._.working == 1) { + if (threadpool->threads->len == 1 && g_ptr_array_index (threadpool->threads, 0) == current) { + /* We are waiting on ourselves */ + break; + } + } + + mono_coop_cond_wait (&threadpool->threads_exit_cond, &threadpool->threads_lock); + } + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + mono_threadpool_worker_cleanup (threadpool->worker); + + mono_refcount_dec (threadpool); +} + +gboolean +mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error) +{ + static MonoClass *threadpool_class = NULL; + static MonoMethod *unsafe_queue_custom_work_item_method = NULL; + MonoDomain *current_domain; + MonoBoolean f; + gpointer args [2]; + + mono_error_init (error); + g_assert (work_item); + + if (!threadpool_class) + threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool"); + + if (!unsafe_queue_custom_work_item_method) + unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2); + g_assert (unsafe_queue_custom_work_item_method); + + f = FALSE; + + args [0] = (gpointer) work_item; + args [1] = (gpointer) &f; + + current_domain = mono_domain_get (); + if (current_domain == domain) { + mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); + return_val_if_nok (error, FALSE); + } else { + mono_thread_push_appdomain_ref (domain); + if (mono_domain_set (domain, FALSE)) { + mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); + if (!is_ok (error)) { + mono_thread_pop_appdomain_ref (); + return FALSE; + } + mono_domain_set (current_domain, TRUE); + } + mono_thread_pop_appdomain_ref (); + } + return TRUE; +} + +/* LOCKING: domains_lock must be held */ +static void +tpdomain_add (ThreadPoolDomain *tpdomain) +{ + guint i, len; + + g_assert (tpdomain); + + len = threadpool->domains->len; + for (i = 0; i < len; ++i) { + if (g_ptr_array_index (threadpool->domains, i) == tpdomain) + break; + } + + if (i == len) + g_ptr_array_add (threadpool->domains, tpdomain); +} + +/* LOCKING: domains_lock must be held. */ +static gboolean +tpdomain_remove (ThreadPoolDomain *tpdomain) +{ + g_assert (tpdomain); + return g_ptr_array_remove (threadpool->domains, tpdomain); +} + +/* LOCKING: domains_lock must be held */ +static ThreadPoolDomain * +tpdomain_get (MonoDomain *domain, gboolean create) +{ + guint i; + ThreadPoolDomain *tpdomain; + + g_assert (domain); + + for (i = 0; i < threadpool->domains->len; ++i) { + ThreadPoolDomain *tpdomain; + + tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); + if (tpdomain->domain == domain) + return tpdomain; + } + + if (!create) + return NULL; + + tpdomain = g_new0 (ThreadPoolDomain, 1); + tpdomain->domain = domain; + mono_coop_cond_init (&tpdomain->cleanup_cond); + + tpdomain_add (tpdomain); + + return tpdomain; +} + +static void +tpdomain_free (ThreadPoolDomain *tpdomain) +{ + g_free (tpdomain); +} + +/* LOCKING: domains_lock must be held */ +static ThreadPoolDomain * +tpdomain_get_next (ThreadPoolDomain *current) +{ + ThreadPoolDomain *tpdomain = NULL; + guint len; + + len = threadpool->domains->len; + if (len > 0) { + guint i, current_idx = -1; + if (current) { + for (i = 0; i < len; ++i) { + if (current == g_ptr_array_index (threadpool->domains, i)) { + current_idx = i; + break; + } + } + g_assert (current_idx != (guint)-1); + } + for (i = current_idx + 1; i < len + current_idx + 1; ++i) { + ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len); + if (tmp->outstanding_request > 0) { + tpdomain = tmp; + break; + } + } + } + + return tpdomain; +} + +static void +worker_callback (gpointer unused) +{ + MonoError error; + ThreadPoolDomain *tpdomain, *previous_tpdomain; + ThreadPoolCounter counter; + MonoInternalThread *thread; + + thread = mono_thread_internal_current (); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.starting --; + counter._.working ++; + }); + + if (mono_runtime_is_shutting_down ()) { + COUNTER_ATOMIC (threadpool, counter, { + counter._.working --; + }); + + mono_refcount_dec (threadpool); + return; + } + + mono_coop_mutex_lock (&threadpool->threads_lock); + g_ptr_array_add (threadpool->threads, thread); + mono_coop_mutex_unlock (&threadpool->threads_lock); + + /* + * This is needed so there is always an lmf frame in the runtime invoke call below, + * so ThreadAbortExceptions are caught even if the thread is in native code. + */ + mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE; + + domains_lock (); + + previous_tpdomain = NULL; + + while (!mono_runtime_is_shutting_down ()) { + gboolean retire = FALSE; + + if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) { + domains_unlock (); + mono_thread_interruption_checkpoint (); + domains_lock (); + } + + tpdomain = tpdomain_get_next (previous_tpdomain); + if (!tpdomain) + break; + + tpdomain->outstanding_request --; + g_assert (tpdomain->outstanding_request >= 0); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d)", + mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); + + g_assert (tpdomain->threadpool_jobs >= 0); + tpdomain->threadpool_jobs ++; + + domains_unlock (); + + mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background); + if (!mono_thread_test_state (thread , ThreadState_Background)) + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); + + mono_thread_push_appdomain_ref (tpdomain->domain); + if (mono_domain_set (tpdomain->domain, FALSE)) { + MonoObject *exc = NULL, *res; + + res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error); + if (exc || !mono_error_ok(&error)) { + if (exc == NULL) + exc = (MonoObject *) mono_error_convert_to_exception (&error); + else + mono_error_cleanup (&error); + mono_thread_internal_unhandled_exception (exc); + } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE) { + retire = TRUE; + } + + mono_domain_set (mono_get_root_domain (), TRUE); + } + mono_thread_pop_appdomain_ref (); + + domains_lock (); + + tpdomain->threadpool_jobs --; + g_assert (tpdomain->threadpool_jobs >= 0); + + if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) { + gboolean removed; + + removed = tpdomain_remove (tpdomain); + g_assert (removed); + + mono_coop_cond_signal (&tpdomain->cleanup_cond); + tpdomain = NULL; + } + + if (retire) + break; + + previous_tpdomain = tpdomain; + } + + domains_unlock (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.working --; + }); + + g_ptr_array_remove_fast (threadpool->threads, thread); + + mono_coop_cond_signal (&threadpool->threads_exit_cond); + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + mono_refcount_dec (threadpool); +} + +void +mono_threadpool_cleanup (void) +{ +#ifndef DISABLE_SOCKETS + mono_threadpool_io_cleanup (); +#endif + mono_lazy_cleanup (&status, cleanup); +} + +MonoAsyncResult * +mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error) +{ + static MonoClass *async_call_klass = NULL; + MonoMethodMessage *message; + MonoAsyncResult *async_result; + MonoAsyncCall *async_call; + MonoDelegate *async_callback = NULL; + MonoObject *state = NULL; + + if (!async_call_klass) + async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); + + mono_lazy_initialize (&status, initialize); + + mono_error_init (error); + + message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error); + return_val_if_nok (error, NULL); + + async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error); + return_val_if_nok (error, NULL); + + MONO_OBJECT_SETREF (async_call, msg, message); + MONO_OBJECT_SETREF (async_call, state, state); + + if (async_callback) { + MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass)); + MONO_OBJECT_SETREF (async_call, cb_target, async_callback); + } + + async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error); + return_val_if_nok (error, NULL); + MONO_OBJECT_SETREF (async_result, async_delegate, target); + + mono_threadpool_enqueue_work_item (domain, (MonoObject*) async_result, error); + return_val_if_nok (error, NULL); + + return async_result; +} + +MonoObject * +mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error) +{ + MonoAsyncCall *ac; + + mono_error_init (error); + g_assert (exc); + g_assert (out_args); + + *exc = NULL; + *out_args = NULL; + + /* check if already finished */ + mono_monitor_enter ((MonoObject*) ares); + + if (ares->endinvoke_called) { + mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once"); + mono_monitor_exit ((MonoObject*) ares); + return NULL; + } + + ares->endinvoke_called = 1; + + /* wait until we are really finished */ + if (ares->completed) { + mono_monitor_exit ((MonoObject *) ares); + } else { + gpointer wait_event; + if (ares->handle) { + wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle); + } else { + wait_event = mono_w32event_create (TRUE, FALSE); + g_assert(wait_event); + MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error); + if (!is_ok (error)) { + CloseHandle (wait_event); + return NULL; + } + MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle); + } + mono_monitor_exit ((MonoObject*) ares); + MONO_ENTER_GC_SAFE; +#ifdef HOST_WIN32 + WaitForSingleObjectEx (wait_event, INFINITE, TRUE); +#else + mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE); +#endif + MONO_EXIT_GC_SAFE; + } + + ac = (MonoAsyncCall*) ares->object_data; + g_assert (ac); + + *exc = ac->msg->exc; /* FIXME: GC add write barrier */ + *out_args = ac->out_args; + return ac->res; +} + +gboolean +mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout) +{ + gint64 end; + ThreadPoolDomain *tpdomain; + gboolean ret; + + g_assert (domain); + g_assert (timeout >= -1); + + g_assert (mono_domain_is_unloading (domain)); + + if (timeout != -1) + end = mono_msec_ticks () + timeout; + +#ifndef DISABLE_SOCKETS + mono_threadpool_io_remove_domain_jobs (domain); + if (timeout != -1) { + if (mono_msec_ticks () > end) + return FALSE; + } +#endif + + /* + * Wait for all threads which execute jobs in the domain to exit. + * The is_unloading () check in worker_request () ensures that + * no new jobs are added after we enter the lock below. + */ + mono_lazy_initialize (&status, initialize); + domains_lock (); + + tpdomain = tpdomain_get (domain, FALSE); + if (!tpdomain) { + domains_unlock (); + return TRUE; + } + + ret = TRUE; + + while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) { + if (timeout == -1) { + mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock); + } else { + gint64 now; + gint res; + + now = mono_msec_ticks(); + if (now > end) { + ret = FALSE; + break; + } + + res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now); + if (res != 0) { + ret = FALSE; + break; + } + } + } + + /* Remove from the list the worker threads look at */ + tpdomain_remove (tpdomain); + + domains_unlock (); + + mono_coop_cond_destroy (&tpdomain->cleanup_cond); + tpdomain_free (tpdomain); + + return ret; +} + +void +mono_threadpool_suspend (void) +{ + if (threadpool) + mono_threadpool_worker_set_suspended (threadpool->worker, TRUE); +} + +void +mono_threadpool_resume (void) +{ + if (threadpool) + mono_threadpool_worker_set_suspended (threadpool->worker, FALSE); +} + +void +ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + ThreadPoolCounter counter; + + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + counter = COUNTER_READ (threadpool); + + *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool->worker) - counter._.working); + *completion_port_threads = threadpool->limit_io_max; +} + +void +ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + *worker_threads = mono_threadpool_worker_get_min (threadpool->worker); + *completion_port_threads = threadpool->limit_io_min; +} + +void +ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + *worker_threads = mono_threadpool_worker_get_max (threadpool->worker); + *completion_port_threads = threadpool->limit_io_max; +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads) +{ + mono_lazy_initialize (&status, initialize); + + if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max) + return FALSE; + + if (!mono_threadpool_worker_set_min (threadpool->worker, worker_threads)) + return FALSE; + + threadpool->limit_io_min = completion_port_threads; + + return TRUE; +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads) +{ + gint cpu_count = mono_cpu_count (); + + mono_lazy_initialize (&status, initialize); + + if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count) + return FALSE; + + if (!mono_threadpool_worker_set_max (threadpool->worker, worker_threads)) + return FALSE; + + threadpool->limit_io_max = completion_port_threads; + + return TRUE; +} + +void +ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking) +{ + if (enable_worker_tracking) { + // TODO implement some kind of switch to have the possibily to use it + *enable_worker_tracking = FALSE; + } + + mono_lazy_initialize (&status, initialize); +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) +{ + if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ()) + return FALSE; + + return mono_threadpool_worker_notify_completed (threadpool->worker); +} + +void +ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) +{ + mono_threadpool_worker_notify_completed (threadpool->worker); +} + +void +ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working) +{ + // TODO + MonoError error; + mono_error_set_not_implemented (&error, ""); + mono_error_set_pending_exception (&error); +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) +{ + MonoDomain *domain; + ThreadPoolDomain *tpdomain; + ThreadPoolCounter counter; + + domain = mono_domain_get (); + if (mono_domain_is_unloading (domain)) + return FALSE; + + domains_lock (); + + /* synchronize with mono_threadpool_remove_domain_jobs */ + if (mono_domain_is_unloading (domain)) { + domains_unlock (); + return FALSE; + } + + tpdomain = tpdomain_get (domain, TRUE); + g_assert (tpdomain); + + tpdomain->outstanding_request ++; + g_assert (tpdomain->outstanding_request >= 1); + + mono_refcount_inc (threadpool); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.starting ++; + }); + + mono_threadpool_worker_enqueue (threadpool->worker, worker_callback, NULL); + + domains_unlock (); + + return TRUE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped) +{ + /* This copy the behavior of the current Mono implementation */ + MonoError error; + mono_error_set_not_implemented (&error, ""); + mono_error_set_pending_exception (&error); + return FALSE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle) +{ + /* This copy the behavior of the current Mono implementation */ + return TRUE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void) +{ + return FALSE; +} diff --git a/mono/metadata/threadpool-ms.h b/mono/metadata/threadpool.h similarity index 72% rename from mono/metadata/threadpool-ms.h rename to mono/metadata/threadpool.h index 1603e3a15d9..df17997b16c 100644 --- a/mono/metadata/threadpool-ms.h +++ b/mono/metadata/threadpool.h @@ -1,5 +1,5 @@ -#ifndef _MONO_THREADPOOL_MICROSOFT_H_ -#define _MONO_THREADPOOL_MICROSOFT_H_ +#ifndef _MONO_METADATA_THREADPOOL_H_ +#define _MONO_METADATA_THREADPOOL_H_ #include #include @@ -12,20 +12,20 @@ typedef struct _MonoNativeOverlapped MonoNativeOverlapped; void -mono_threadpool_ms_cleanup (void); +mono_threadpool_cleanup (void); MonoAsyncResult * -mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error); +mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error); MonoObject * -mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error); +mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error); gboolean -mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout); +mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout); void -mono_threadpool_ms_suspend (void); +mono_threadpool_suspend (void); void -mono_threadpool_ms_resume (void); +mono_threadpool_resume (void); void ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); @@ -60,6 +60,6 @@ ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void); /* Internals */ gboolean -mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error); +mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error); -#endif // _MONO_THREADPOOL_MICROSOFT_H_ +#endif // _MONO_METADATA_THREADPOOL_H_ diff --git a/mono/metadata/w32process-win32.c b/mono/metadata/w32process-win32.c index f34b0373a8d..d7276546f5e 100644 --- a/mono/metadata/w32process-win32.c +++ b/mono/metadata/w32process-win32.c @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/mono/mini/debugger-agent.c b/mono/mini/debugger-agent.c index 171bae152e4..9b188eeb2af 100644 --- a/mono/mini/debugger-agent.c +++ b/mono/mini/debugger-agent.c @@ -58,7 +58,7 @@ #include #include #include -#include +#include #include #include #include @@ -2774,7 +2774,7 @@ suspend_vm (void) /* * Suspend creation of new threadpool threads, since they cannot run */ - mono_threadpool_ms_suspend (); + mono_threadpool_suspend (); mono_loader_unlock (); } @@ -2812,7 +2812,7 @@ resume_vm (void) //g_assert (err == 0); if (suspend_count == 0) - mono_threadpool_ms_resume (); + mono_threadpool_resume (); mono_loader_unlock (); } diff --git a/mono/utils/mono-lazy-init.h b/mono/utils/mono-lazy-init.h index 194eadc15c9..7deca1275f5 100644 --- a/mono/utils/mono-lazy-init.h +++ b/mono/utils/mono-lazy-init.h @@ -20,7 +20,7 @@ /* * These functions should be used if you want some form of lazy initialization. You can have a look at the - * threadpool-ms for a more detailed example. + * threadpool for a more detailed example. * * The idea is that a module can be in 5 different states: * - not initialized: it is the first state it starts in diff --git a/msvc/libmonoruntime.vcxproj b/msvc/libmonoruntime.vcxproj index b73ba7d0cf2..f0e8db4e9a8 100644 --- a/msvc/libmonoruntime.vcxproj +++ b/msvc/libmonoruntime.vcxproj @@ -91,8 +91,9 @@ - - + + + @@ -154,8 +155,9 @@ - - + + + diff --git a/msvc/libmonoruntime.vcxproj.filters b/msvc/libmonoruntime.vcxproj.filters index db52a544a49..1154e5e8fc7 100644 --- a/msvc/libmonoruntime.vcxproj.filters +++ b/msvc/libmonoruntime.vcxproj.filters @@ -163,10 +163,13 @@ Source Files - + Source Files - + + Source Files + + Source Files @@ -465,13 +468,16 @@ Header Files - + + Header Files + + Header Files Header Files - + Header Files -- 2.25.1