From: Ludovic Henry Date: Fri, 9 Dec 2016 20:40:28 +0000 (-0500) Subject: [threadpool] Split domain and worker management (#4117) X-Git-Url: http://wien.tomnetworks.com/gitweb/?p=mono.git;a=commitdiff_plain;h=5e845a3bc96ed7dd9058c7d47ad1dee28320bfc3 [threadpool] Split domain and worker management (#4117) * [threadpool] Remove `-ms` suffix * [threadpool] Split domain and worker management This will allow us to use native threadpool more easily: we will simply have to implement a threadpool worker with the specific API. On windows, we will use the Win32 Threadpool, and on OSX we will explore using GCD (Grand Central Dispatch). --- diff --git a/mono/metadata/Makefile.am b/mono/metadata/Makefile.am index db8663c6e59..92b99cd28bc 100644 --- a/mono/metadata/Makefile.am +++ b/mono/metadata/Makefile.am @@ -230,10 +230,12 @@ common_sources = \ tabledefs.h \ threads.c \ threads-types.h \ - threadpool-ms.c \ - threadpool-ms.h \ - threadpool-ms-io.c \ - threadpool-ms-io.h \ + threadpool.c \ + threadpool.h \ + threadpool-worker-default.c \ + threadpool-worker.h \ + threadpool-io.c \ + threadpool-io.h \ verify.c \ verify-internals.h \ wrapper-types.h \ @@ -342,4 +344,4 @@ libmonoruntimeinclude_HEADERS = \ verify.h EXTRA_DIST = $(win32_sources) $(unix_sources) $(null_sources) runtime.h \ - threadpool-ms-io-poll.c threadpool-ms-io-epoll.c threadpool-ms-io-kqueue.c sgen-dynarray.h + threadpool-io-poll.c threadpool-io-epoll.c threadpool-io-kqueue.c sgen-dynarray.h diff --git a/mono/metadata/appdomain.c b/mono/metadata/appdomain.c index a571f676b45..9e23de19060 100644 --- a/mono/metadata/appdomain.c +++ b/mono/metadata/appdomain.c @@ -42,7 +42,7 @@ #include #include #include -#include +#include #include #include #include @@ -2418,7 +2418,7 @@ unload_thread_main (void *arg) goto failure; } - if (!mono_threadpool_ms_remove_domain_jobs (domain, -1)) { + if (!mono_threadpool_remove_domain_jobs (domain, -1)) { data->failure_reason = g_strdup_printf ("Cleanup of threadpool jobs of domain %s timed out.", domain->friendly_name); goto failure; } diff --git a/mono/metadata/console-unix.c b/mono/metadata/console-unix.c index 10272322966..7d1e9a1ec00 100644 --- a/mono/metadata/console-unix.c +++ b/mono/metadata/console-unix.c @@ -34,7 +34,7 @@ #include #include #include -#include +#include #include #include #include @@ -258,7 +258,7 @@ do_console_cancel_event (void) method = mono_class_get_method_from_name (klass, "BeginInvoke", -1); g_assert (method != NULL); - mono_threadpool_ms_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error); + mono_threadpool_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error); if (!is_ok (&error)) { g_warning ("Couldn't invoke System.Console cancel handler due to %s", mono_error_get_message (&error)); mono_error_cleanup (&error); diff --git a/mono/metadata/gc.c b/mono/metadata/gc.c index 53cce47edad..ad9db931df4 100644 --- a/mono/metadata/gc.c +++ b/mono/metadata/gc.c @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -556,7 +556,7 @@ mono_domain_finalize (MonoDomain *domain, guint32 timeout) } if (domain == mono_get_root_domain ()) { - mono_threadpool_ms_cleanup (); + mono_threadpool_cleanup (); mono_gc_finalize_threadpool_threads (); } diff --git a/mono/metadata/icall.c b/mono/metadata/icall.c index 946803d692d..c11cbad5228 100644 --- a/mono/metadata/icall.c +++ b/mono/metadata/icall.c @@ -36,8 +36,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include diff --git a/mono/metadata/marshal.c b/mono/metadata/marshal.c index 5eb7a8a9902..e376d860f85 100644 --- a/mono/metadata/marshal.c +++ b/mono/metadata/marshal.c @@ -40,7 +40,7 @@ #include "mono/metadata/cominterop.h" #include "mono/metadata/remoting.h" #include "mono/metadata/reflection-internals.h" -#include "mono/metadata/threadpool-ms.h" +#include "mono/metadata/threadpool.h" #include "mono/metadata/handle.h" #include "mono/utils/mono-counters.h" #include "mono/utils/mono-tls.h" @@ -2451,7 +2451,7 @@ mono_delegate_begin_invoke (MonoDelegate *delegate, gpointer *params) method = mono_get_delegate_invoke (klass); g_assert (method); - MonoAsyncResult *result = mono_threadpool_ms_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error); + MonoAsyncResult *result = mono_threadpool_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error); mono_error_set_pending_exception (&error); return result; } @@ -3210,7 +3210,7 @@ mono_delegate_end_invoke (MonoDelegate *delegate, gpointer *params) } else #endif { - res = mono_threadpool_ms_end_invoke (ares, &out_args, &exc, &error); + res = mono_threadpool_end_invoke (ares, &out_args, &exc, &error); if (mono_error_set_pending_exception (&error)) return NULL; } diff --git a/mono/metadata/runtime.c b/mono/metadata/runtime.c index 078b4cef997..99621d5cca0 100644 --- a/mono/metadata/runtime.c +++ b/mono/metadata/runtime.c @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include @@ -110,7 +110,7 @@ mono_runtime_try_shutdown (void) mono_runtime_set_shutting_down (); /* This will kill the tp threads which cannot be suspended */ - mono_threadpool_ms_cleanup (); + mono_threadpool_cleanup (); /*TODO move the follow to here: mono_thread_suspend_all_other_threads (); OR mono_thread_wait_all_other_threads diff --git a/mono/metadata/socket-io.c b/mono/metadata/socket-io.c index 142c3c53ff2..606eff3692d 100644 --- a/mono/metadata/socket-io.c +++ b/mono/metadata/socket-io.c @@ -54,7 +54,7 @@ #include #include #include -#include +#include #include /* FIXME change this code to not mess so much with the internals */ #include @@ -646,7 +646,7 @@ ves_icall_System_Net_Sockets_Socket_Close_internal (SOCKET sock, gint32 *werror) /* Clear any pending work item from this socket if the underlying * polling system does not notify when the socket is closed */ - mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (sock)); + mono_threadpool_io_remove_socket (GPOINTER_TO_INT (sock)); MONO_ENTER_GC_SAFE; closesocket (sock); diff --git a/mono/metadata/threadpool-io-epoll.c b/mono/metadata/threadpool-io-epoll.c new file mode 100644 index 00000000000..2bc99e105b9 --- /dev/null +++ b/mono/metadata/threadpool-io-epoll.c @@ -0,0 +1,130 @@ + +#if defined(HAVE_EPOLL) + +#include + +#if defined(HOST_WIN32) +/* We assume that epoll is not available on windows */ +#error +#endif + +#define EPOLL_NEVENTS 128 + +static gint epoll_fd; +static struct epoll_event *epoll_events; + +static gboolean +epoll_init (gint wakeup_pipe_fd) +{ + struct epoll_event event; + +#ifdef EPOOL_CLOEXEC + epoll_fd = epoll_create1 (EPOLL_CLOEXEC); +#else + epoll_fd = epoll_create (256); + fcntl (epoll_fd, F_SETFD, FD_CLOEXEC); +#endif + + if (epoll_fd == -1) { +#ifdef EPOOL_CLOEXEC + g_error ("epoll_init: epoll (EPOLL_CLOEXEC) failed, error (%d) %s\n", errno, g_strerror (errno)); +#else + g_error ("epoll_init: epoll (256) failed, error (%d) %s\n", errno, g_strerror (errno)); +#endif + return FALSE; + } + + event.events = EPOLLIN; + event.data.fd = wakeup_pipe_fd; + if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event) == -1) { + g_error ("epoll_init: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno)); + close (epoll_fd); + return FALSE; + } + + epoll_events = g_new0 (struct epoll_event, EPOLL_NEVENTS); + + return TRUE; +} + +static void +epoll_register_fd (gint fd, gint events, gboolean is_new) +{ + struct epoll_event event; + +#ifndef EPOLLONESHOT +/* it was only defined on android in May 2013 */ +#define EPOLLONESHOT 0x40000000 +#endif + + event.data.fd = fd; + event.events = EPOLLONESHOT; + if ((events & EVENT_IN) != 0) + event.events |= EPOLLIN; + if ((events & EVENT_OUT) != 0) + event.events |= EPOLLOUT; + + if (epoll_ctl (epoll_fd, is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, event.data.fd, &event) == -1) + g_error ("epoll_register_fd: epoll_ctl(%s) failed, error (%d) %s", is_new ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD", errno, g_strerror (errno)); +} + +static void +epoll_remove_fd (gint fd) +{ + if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) + g_error ("epoll_remove_fd: epoll_ctl (EPOLL_CTL_DEL) failed, error (%d) %s", errno, g_strerror (errno)); +} + +static gint +epoll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) +{ + gint i, ready; + + memset (epoll_events, 0, sizeof (struct epoll_event) * EPOLL_NEVENTS); + + mono_gc_set_skip_thread (TRUE); + + MONO_ENTER_GC_SAFE; + ready = epoll_wait (epoll_fd, epoll_events, EPOLL_NEVENTS, -1); + MONO_EXIT_GC_SAFE; + + mono_gc_set_skip_thread (FALSE); + + if (ready == -1) { + switch (errno) { + case EINTR: + mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); + ready = 0; + break; + default: + g_error ("epoll_event_wait: epoll_wait () failed, error (%d) %s", errno, g_strerror (errno)); + break; + } + } + + if (ready == -1) + return -1; + + for (i = 0; i < ready; ++i) { + gint fd, events = 0; + + fd = epoll_events [i].data.fd; + if (epoll_events [i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + events |= EVENT_IN; + if (epoll_events [i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + events |= EVENT_OUT; + + callback (fd, events, user_data); + } + + return 0; +} + +static ThreadPoolIOBackend backend_epoll = { + .init = epoll_init, + .register_fd = epoll_register_fd, + .remove_fd = epoll_remove_fd, + .event_wait = epoll_event_wait, +}; + +#endif diff --git a/mono/metadata/threadpool-io-kqueue.c b/mono/metadata/threadpool-io-kqueue.c new file mode 100644 index 00000000000..4422b668e8f --- /dev/null +++ b/mono/metadata/threadpool-io-kqueue.c @@ -0,0 +1,127 @@ + +#if defined(HAVE_KQUEUE) + +#include +#include +#include + +#if defined(HOST_WIN32) +/* We assume that kqueue is not available on windows */ +#error +#endif + +#define KQUEUE_NEVENTS 128 + +static gint kqueue_fd; +static struct kevent *kqueue_events; + +static gint +KQUEUE_INIT_FD (gint fd, gint events, gint flags) +{ + struct kevent event; + EV_SET (&event, fd, events, flags, 0, 0, 0); + return kevent (kqueue_fd, &event, 1, NULL, 0, NULL); +} + +static gboolean +kqueue_init (gint wakeup_pipe_fd) +{ + kqueue_fd = kqueue (); + if (kqueue_fd == -1) { + g_error ("kqueue_init: kqueue () failed, error (%d) %s", errno, g_strerror (errno)); + return FALSE; + } + + if (KQUEUE_INIT_FD (wakeup_pipe_fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) { + g_error ("kqueue_init: kevent () failed, error (%d) %s", errno, g_strerror (errno)); + close (kqueue_fd); + return FALSE; + } + + kqueue_events = g_new0 (struct kevent, KQUEUE_NEVENTS); + + return TRUE; +} + +static void +kqueue_register_fd (gint fd, gint events, gboolean is_new) +{ + if (events & EVENT_IN) { + if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) + g_error ("kqueue_register_fd: kevent(read,enable) failed, error (%d) %s", errno, g_strerror (errno)); + } else { + if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_DISABLE) == -1) + g_error ("kqueue_register_fd: kevent(read,disable) failed, error (%d) %s", errno, g_strerror (errno)); + } + if (events & EVENT_OUT) { + if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_ENABLE) == -1) + g_error ("kqueue_register_fd: kevent(write,enable) failed, error (%d) %s", errno, g_strerror (errno)); + } else { + if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_DISABLE) == -1) + g_error ("kqueue_register_fd: kevent(write,disable) failed, error (%d) %s", errno, g_strerror (errno)); + } +} + +static void +kqueue_remove_fd (gint fd) +{ + /* FIXME: a race between closing and adding operation in the Socket managed code trigger a ENOENT error */ + if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_DELETE) == -1) + g_error ("kqueue_register_fd: kevent(read,delete) failed, error (%d) %s", errno, g_strerror (errno)); + if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_DELETE) == -1) + g_error ("kqueue_register_fd: kevent(write,delete) failed, error (%d) %s", errno, g_strerror (errno)); +} + +static gint +kqueue_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) +{ + gint i, ready; + + memset (kqueue_events, 0, sizeof (struct kevent) * KQUEUE_NEVENTS); + + mono_gc_set_skip_thread (TRUE); + + MONO_ENTER_GC_SAFE; + ready = kevent (kqueue_fd, NULL, 0, kqueue_events, KQUEUE_NEVENTS, NULL); + MONO_EXIT_GC_SAFE; + + mono_gc_set_skip_thread (FALSE); + + if (ready == -1) { + switch (errno) { + case EINTR: + mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); + ready = 0; + break; + default: + g_error ("kqueue_event_wait: kevent () failed, error (%d) %s", errno, g_strerror (errno)); + break; + } + } + + if (ready == -1) + return -1; + + for (i = 0; i < ready; ++i) { + gint fd, events = 0; + + fd = kqueue_events [i].ident; + if (kqueue_events [i].filter == EVFILT_READ || (kqueue_events [i].flags & EV_ERROR) != 0) + events |= EVENT_IN; + if (kqueue_events [i].filter == EVFILT_WRITE || (kqueue_events [i].flags & EV_ERROR) != 0) + events |= EVENT_OUT; + + callback (fd, events, user_data); + } + + return 0; +} + +static ThreadPoolIOBackend backend_kqueue = { + .init = kqueue_init, + .register_fd = kqueue_register_fd, + .remove_fd = kqueue_remove_fd, + .event_wait = kqueue_event_wait, +}; + +#endif diff --git a/mono/metadata/threadpool-io-poll.c b/mono/metadata/threadpool-io-poll.c new file mode 100644 index 00000000000..3d32130d3b5 --- /dev/null +++ b/mono/metadata/threadpool-io-poll.c @@ -0,0 +1,218 @@ + +#include "utils/mono-poll.h" + +static mono_pollfd *poll_fds; +static guint poll_fds_capacity; +static guint poll_fds_size; + +static inline void +POLL_INIT_FD (mono_pollfd *poll_fd, gint fd, gint events) +{ + poll_fd->fd = fd; + poll_fd->events = events; + poll_fd->revents = 0; +} + +static gboolean +poll_init (gint wakeup_pipe_fd) +{ + g_assert (wakeup_pipe_fd >= 0); + + poll_fds_size = 1; + poll_fds_capacity = 64; + + poll_fds = g_new0 (mono_pollfd, poll_fds_capacity); + + POLL_INIT_FD (&poll_fds [0], wakeup_pipe_fd, MONO_POLLIN); + + return TRUE; +} + +static void +poll_register_fd (gint fd, gint events, gboolean is_new) +{ + gint i; + gint poll_event; + + g_assert (fd >= 0); + g_assert (poll_fds_size <= poll_fds_capacity); + + g_assert ((events & ~(EVENT_IN | EVENT_OUT)) == 0); + + poll_event = 0; + if (events & EVENT_IN) + poll_event |= MONO_POLLIN; + if (events & EVENT_OUT) + poll_event |= MONO_POLLOUT; + + for (i = 0; i < poll_fds_size; ++i) { + if (poll_fds [i].fd == fd) { + g_assert (!is_new); + POLL_INIT_FD (&poll_fds [i], fd, poll_event); + return; + } + } + + g_assert (is_new); + + for (i = 0; i < poll_fds_size; ++i) { + if (poll_fds [i].fd == -1) { + POLL_INIT_FD (&poll_fds [i], fd, poll_event); + return; + } + } + + poll_fds_size += 1; + + if (poll_fds_size > poll_fds_capacity) { + poll_fds_capacity *= 2; + g_assert (poll_fds_size <= poll_fds_capacity); + + poll_fds = (mono_pollfd *)g_renew (mono_pollfd, poll_fds, poll_fds_capacity); + } + + POLL_INIT_FD (&poll_fds [poll_fds_size - 1], fd, poll_event); +} + +static void +poll_remove_fd (gint fd) +{ + gint i; + + g_assert (fd >= 0); + + for (i = 0; i < poll_fds_size; ++i) { + if (poll_fds [i].fd == fd) { + POLL_INIT_FD (&poll_fds [i], -1, 0); + break; + } + } + + /* if we don't find the fd in poll_fds, + * it means we try to delete it twice */ + g_assert (i < poll_fds_size); + + /* if we find it again, it means we added + * it twice */ + for (; i < poll_fds_size; ++i) + g_assert (poll_fds [i].fd != fd); + + /* reduce the value of poll_fds_size so we + * do not keep it too big */ + while (poll_fds_size > 1 && poll_fds [poll_fds_size - 1].fd == -1) + poll_fds_size -= 1; +} + +static inline gint +poll_mark_bad_fds (mono_pollfd *poll_fds, gint poll_fds_size) +{ + gint i, ready = 0; + + for (i = 0; i < poll_fds_size; i++) { + if (poll_fds [i].fd == -1) + continue; + + switch (mono_poll (&poll_fds [i], 1, 0)) { + case 1: + ready++; + break; + case -1: + if (errno == EBADF) + { + poll_fds [i].revents |= MONO_POLLNVAL; + ready++; + } + break; + } + } + + return ready; +} + +static gint +poll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) +{ + gint i, ready; + + for (i = 0; i < poll_fds_size; ++i) + poll_fds [i].revents = 0; + + mono_gc_set_skip_thread (TRUE); + + MONO_ENTER_GC_SAFE; + ready = mono_poll (poll_fds, poll_fds_size, -1); + MONO_EXIT_GC_SAFE; + + mono_gc_set_skip_thread (FALSE); + + if (ready == -1) { + /* + * Apart from EINTR, we only check EBADF, for the rest: + * EINVAL: mono_poll() 'protects' us from descriptor + * numbers above the limit if using select() by marking + * then as POLLERR. If a system poll() is being + * used, the number of descriptor we're passing will not + * be over sysconf(_SC_OPEN_MAX), as the error would have + * happened when opening. + * + * EFAULT: we own the memory pointed by pfds. + * ENOMEM: we're doomed anyway + * + */ + switch (errno) + { + case EINTR: + { + mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); + ready = 0; + break; + } + case EBADF: + { + ready = poll_mark_bad_fds (poll_fds, poll_fds_size); + break; + } + default: + g_error ("poll_event_wait: mono_poll () failed, error (%d) %s", errno, g_strerror (errno)); + break; + } + } + + if (ready == -1) + return -1; + if (ready == 0) + return 0; + + g_assert (ready > 0); + + for (i = 0; i < poll_fds_size; ++i) { + gint fd, events = 0; + + if (poll_fds [i].fd == -1) + continue; + if (poll_fds [i].revents == 0) + continue; + + fd = poll_fds [i].fd; + if (poll_fds [i].revents & (MONO_POLLIN | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) + events |= EVENT_IN; + if (poll_fds [i].revents & (MONO_POLLOUT | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) + events |= EVENT_OUT; + if (poll_fds [i].revents & (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) + events |= EVENT_ERR; + + callback (fd, events, user_data); + + if (--ready == 0) + break; + } + + return 0; +} + +static ThreadPoolIOBackend backend_poll = { + .init = poll_init, + .register_fd = poll_register_fd, + .remove_fd = poll_remove_fd, + .event_wait = poll_event_wait, +}; diff --git a/mono/metadata/threadpool-io.c b/mono/metadata/threadpool-io.c new file mode 100644 index 00000000000..c7986ab0d60 --- /dev/null +++ b/mono/metadata/threadpool-io.c @@ -0,0 +1,675 @@ +/* + * threadpool-io.c: Microsoft IO threadpool runtime support + * + * Author: + * Ludovic Henry (ludovic.henry@xamarin.com) + * + * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +#include + +#ifndef DISABLE_SOCKETS + +#include + +#if defined(HOST_WIN32) +#include +#else +#include +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct { + gboolean (*init) (gint wakeup_pipe_fd); + void (*register_fd) (gint fd, gint events, gboolean is_new); + void (*remove_fd) (gint fd); + gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data); +} ThreadPoolIOBackend; + +/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */ +enum MonoIOOperation { + EVENT_IN = 1 << 0, + EVENT_OUT = 1 << 1, + EVENT_ERR = 1 << 2, /* not in managed */ +}; + +#include "threadpool-io-epoll.c" +#include "threadpool-io-kqueue.c" +#include "threadpool-io-poll.c" + +#define UPDATES_CAPACITY 128 + +/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */ +struct _MonoIOSelectorJob { + MonoObject object; + gint32 operation; + MonoObject *callback; + MonoObject *state; +}; + +typedef enum { + UPDATE_EMPTY = 0, + UPDATE_ADD, + UPDATE_REMOVE_SOCKET, + UPDATE_REMOVE_DOMAIN, +} ThreadPoolIOUpdateType; + +typedef struct { + gint fd; + MonoIOSelectorJob *job; +} ThreadPoolIOUpdate_Add; + +typedef struct { + gint fd; +} ThreadPoolIOUpdate_RemoveSocket; + +typedef struct { + MonoDomain *domain; +} ThreadPoolIOUpdate_RemoveDomain; + +typedef struct { + ThreadPoolIOUpdateType type; + union { + ThreadPoolIOUpdate_Add add; + ThreadPoolIOUpdate_RemoveSocket remove_socket; + ThreadPoolIOUpdate_RemoveDomain remove_domain; + } data; +} ThreadPoolIOUpdate; + +typedef struct { + ThreadPoolIOBackend backend; + + ThreadPoolIOUpdate updates [UPDATES_CAPACITY]; + gint updates_size; + MonoCoopMutex updates_lock; + MonoCoopCond updates_cond; + +#if !defined(HOST_WIN32) + gint wakeup_pipes [2]; +#else + SOCKET wakeup_pipes [2]; +#endif +} ThreadPoolIO; + +static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; + +static gboolean io_selector_running = FALSE; + +static ThreadPoolIO* threadpool_io; + +static MonoIOSelectorJob* +get_job_for_event (MonoMList **list, gint32 event) +{ + MonoMList *current; + + g_assert (list); + + for (current = *list; current; current = mono_mlist_next (current)) { + MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current); + if (job->operation == event) { + *list = mono_mlist_remove_item (*list, current); + return job; + } + } + + return NULL; +} + +static gint +get_operations_for_jobs (MonoMList *list) +{ + MonoMList *current; + gint operations = 0; + + for (current = list; current; current = mono_mlist_next (current)) + operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation; + + return operations; +} + +static void +selector_thread_wakeup (void) +{ + gchar msg = 'c'; + gint written; + + for (;;) { +#if !defined(HOST_WIN32) + written = write (threadpool_io->wakeup_pipes [1], &msg, 1); + if (written == 1) + break; + if (written == -1) { + g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno)); + break; + } +#else + written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0); + if (written == 1) + break; + if (written == SOCKET_ERROR) { + g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ()); + break; + } +#endif + } +} + +static void +selector_thread_wakeup_drain_pipes (void) +{ + gchar buffer [128]; + gint received; + + for (;;) { +#if !defined(HOST_WIN32) + received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer)); + if (received == 0) + break; + if (received == -1) { + if (errno != EINTR && errno != EAGAIN) + g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno)); + break; + } +#else + received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0); + if (received == 0) + break; + if (received == SOCKET_ERROR) { + if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK) + g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ()); + break; + } +#endif + } +} + +typedef struct { + MonoDomain *domain; + MonoGHashTable *states; +} FilterSockaresForDomainData; + +static void +filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data) +{ + FilterSockaresForDomainData *data; + MonoMList *list = (MonoMList *)value, *element; + MonoDomain *domain; + MonoGHashTable *states; + + g_assert (user_data); + data = (FilterSockaresForDomainData *)user_data; + domain = data->domain; + states = data->states; + + for (element = list; element; element = mono_mlist_next (element)) { + MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element); + if (mono_object_domain (job) == domain) + mono_mlist_set_data (element, NULL); + } + + /* we skip all the first elements which are NULL */ + for (; list; list = mono_mlist_next (list)) { + if (mono_mlist_get_data (list)) + break; + } + + if (list) { + g_assert (mono_mlist_get_data (list)); + + /* we delete all the NULL elements after the first one */ + for (element = list; element;) { + MonoMList *next; + if (!(next = mono_mlist_next (element))) + break; + if (mono_mlist_get_data (next)) + element = next; + else + mono_mlist_set_next (element, mono_mlist_next (next)); + } + } + + mono_g_hash_table_replace (states, key, list); +} + +static void +wait_callback (gint fd, gint events, gpointer user_data) +{ + MonoError error; + + if (mono_runtime_is_shutting_down ()) + return; + + if (fd == threadpool_io->wakeup_pipes [0]) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke"); + selector_thread_wakeup_drain_pipes (); + } else { + MonoGHashTable *states; + MonoMList *list = NULL; + gpointer k; + gboolean remove_fd = FALSE; + gint operations; + + g_assert (user_data); + states = (MonoGHashTable *)user_data; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s", + fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "..."); + + if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) + g_error ("wait_callback: fd %d not found in states table", fd); + + if (list && (events & EVENT_IN) != 0) { + MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN); + if (job) { + mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); + mono_error_assert_ok (&error); + } + + } + if (list && (events & EVENT_OUT) != 0) { + MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT); + if (job) { + mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); + mono_error_assert_ok (&error); + } + } + + remove_fd = (events & EVENT_ERR) == EVENT_ERR; + if (!remove_fd) { + mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list); + + operations = get_operations_for_jobs (list); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s", + fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "..."); + + threadpool_io->backend.register_fd (fd, operations, FALSE); + } else { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd); + + mono_g_hash_table_remove (states, GINT_TO_POINTER (fd)); + + threadpool_io->backend.remove_fd (fd); + } + } +} + +static void +selector_thread (gpointer data) +{ + MonoError error; + MonoGHashTable *states; + + io_selector_running = TRUE; + + if (mono_runtime_is_shutting_down ()) { + io_selector_running = FALSE; + return; + } + + states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table"); + + for (;;) { + gint i, j; + gint res; + + mono_coop_mutex_lock (&threadpool_io->updates_lock); + + for (i = 0; i < threadpool_io->updates_size; ++i) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [i]; + + switch (update->type) { + case UPDATE_EMPTY: + break; + case UPDATE_ADD: { + gint fd; + gint operations; + gpointer k; + gboolean exists; + MonoMList *list = NULL; + MonoIOSelectorJob *job; + + fd = update->data.add.fd; + g_assert (fd >= 0); + + job = update->data.add.job; + g_assert (job); + + exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list); + list = mono_mlist_append_checked (list, (MonoObject*) job, &error); + mono_error_assert_ok (&error); + mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list); + + operations = get_operations_for_jobs (list); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s", + exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "..."); + + threadpool_io->backend.register_fd (fd, operations, !exists); + + break; + } + case UPDATE_REMOVE_SOCKET: { + gint fd; + gpointer k; + MonoMList *list = NULL; + + fd = update->data.remove_socket.fd; + g_assert (fd >= 0); + + if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) { + mono_g_hash_table_remove (states, GINT_TO_POINTER (fd)); + + for (j = i + 1; j < threadpool_io->updates_size; ++j) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; + if (update->type == UPDATE_ADD && update->data.add.fd == fd) + memset (update, 0, sizeof (ThreadPoolIOUpdate)); + } + + for (; list; list = mono_mlist_remove_item (list, list)) { + mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error); + mono_error_assert_ok (&error); + } + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd); + threadpool_io->backend.remove_fd (fd); + } + + break; + } + case UPDATE_REMOVE_DOMAIN: { + MonoDomain *domain; + + domain = update->data.remove_domain.domain; + g_assert (domain); + + FilterSockaresForDomainData user_data = { .domain = domain, .states = states }; + mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data); + + for (j = i + 1; j < threadpool_io->updates_size; ++j) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; + if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain) + memset (update, 0, sizeof (ThreadPoolIOUpdate)); + } + + break; + } + default: + g_assert_not_reached (); + } + } + + mono_coop_cond_broadcast (&threadpool_io->updates_cond); + + if (threadpool_io->updates_size > 0) { + threadpool_io->updates_size = 0; + memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate)); + } + + mono_coop_mutex_unlock (&threadpool_io->updates_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai"); + + res = threadpool_io->backend.event_wait (wait_callback, states); + + if (res == -1 || mono_runtime_is_shutting_down ()) + break; + } + + mono_g_hash_table_destroy (states); + + io_selector_running = FALSE; +} + +/* Locking: threadpool_io->updates_lock must be held */ +static ThreadPoolIOUpdate* +update_get_new (void) +{ + ThreadPoolIOUpdate *update = NULL; + g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY); + + while (threadpool_io->updates_size == UPDATES_CAPACITY) { + /* we wait for updates to be applied in the selector_thread and we loop + * as long as none are available. if it happends too much, then we need + * to increase UPDATES_CAPACITY */ + mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); + } + + g_assert (threadpool_io->updates_size < UPDATES_CAPACITY); + + update = &threadpool_io->updates [threadpool_io->updates_size ++]; + + return update; +} + +static void +wakeup_pipes_init (void) +{ +#if !defined(HOST_WIN32) + if (pipe (threadpool_io->wakeup_pipes) == -1) + g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno)); + if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1) + g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno)); +#else + struct sockaddr_in client; + struct sockaddr_in server; + SOCKET server_sock; + gulong arg; + gint size; + + server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + g_assert (server_sock != INVALID_SOCKET); + threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET); + + server.sin_family = AF_INET; + server.sin_addr.s_addr = inet_addr ("127.0.0.1"); + server.sin_port = 0; + if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) { + closesocket (server_sock); + g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ()); + } + + size = sizeof (server); + if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) { + closesocket (server_sock); + g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ()); + } + if (listen (server_sock, 1024) == SOCKET_ERROR) { + closesocket (server_sock); + g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ()); + } + if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) { + closesocket (server_sock); + g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ()); + } + + size = sizeof (client); + threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size); + g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET); + + arg = 1; + if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) { + closesocket (threadpool_io->wakeup_pipes [0]); + closesocket (server_sock); + g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ()); + } + + closesocket (server_sock); +#endif +} + +static void +initialize (void) +{ + g_assert (!threadpool_io); + threadpool_io = g_new0 (ThreadPoolIO, 1); + g_assert (threadpool_io); + + mono_coop_mutex_init (&threadpool_io->updates_lock); + mono_coop_cond_init (&threadpool_io->updates_cond); + mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list"); + + threadpool_io->updates_size = 0; + + threadpool_io->backend = backend_poll; + if (g_getenv ("MONO_ENABLE_AIO") != NULL) { +#if defined(HAVE_EPOLL) + threadpool_io->backend = backend_epoll; +#elif defined(HAVE_KQUEUE) + threadpool_io->backend = backend_kqueue; +#endif + } + + wakeup_pipes_init (); + + if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0])) + g_error ("initialize: backend->init () failed"); + + MonoError error; + if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error)) + g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error)); +} + +static void +cleanup (void) +{ + /* we make the assumption along the code that we are + * cleaning up only if the runtime is shutting down */ + g_assert (mono_runtime_is_shutting_down ()); + + selector_thread_wakeup (); + while (io_selector_running) + mono_thread_info_usleep (1000); +} + +void +mono_threadpool_io_cleanup (void) +{ + mono_lazy_cleanup (&io_status, cleanup); +} + +void +ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job) +{ + ThreadPoolIOUpdate *update; + + g_assert (handle); + + g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT)); + g_assert (job->callback); + + if (mono_runtime_is_shutting_down ()) + return; + if (mono_domain_is_unloading (mono_object_domain (job))) + return; + + mono_lazy_initialize (&io_status, initialize); + + mono_coop_mutex_lock (&threadpool_io->updates_lock); + + update = update_get_new (); + update->type = UPDATE_ADD; + update->data.add.fd = GPOINTER_TO_INT (handle); + update->data.add.job = job; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ + + selector_thread_wakeup (); + + mono_coop_mutex_unlock (&threadpool_io->updates_lock); +} + +void +ves_icall_System_IOSelector_Remove (gpointer handle) +{ + mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle)); +} + +void +mono_threadpool_io_remove_socket (int fd) +{ + ThreadPoolIOUpdate *update; + + if (!mono_lazy_is_initialized (&io_status)) + return; + + mono_coop_mutex_lock (&threadpool_io->updates_lock); + + update = update_get_new (); + update->type = UPDATE_REMOVE_SOCKET; + update->data.add.fd = fd; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ + + selector_thread_wakeup (); + + mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); + + mono_coop_mutex_unlock (&threadpool_io->updates_lock); +} + +void +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain) +{ + ThreadPoolIOUpdate *update; + + if (!mono_lazy_is_initialized (&io_status)) + return; + + mono_coop_mutex_lock (&threadpool_io->updates_lock); + + update = update_get_new (); + update->type = UPDATE_REMOVE_DOMAIN; + update->data.remove_domain.domain = domain; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ + + selector_thread_wakeup (); + + mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); + + mono_coop_mutex_unlock (&threadpool_io->updates_lock); +} + +#else + +void +ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job) +{ + g_assert_not_reached (); +} + +void +ves_icall_System_IOSelector_Remove (gpointer handle) +{ + g_assert_not_reached (); +} + +void +mono_threadpool_io_cleanup (void) +{ + g_assert_not_reached (); +} + +void +mono_threadpool_io_remove_socket (int fd) +{ + g_assert_not_reached (); +} + +void +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain) +{ + g_assert_not_reached (); +} + +#endif diff --git a/mono/metadata/threadpool-io.h b/mono/metadata/threadpool-io.h new file mode 100644 index 00000000000..0936ee016cd --- /dev/null +++ b/mono/metadata/threadpool-io.h @@ -0,0 +1,26 @@ + +#ifndef _MONO_METADATA_THREADPOOL_IO_H_ +#define _MONO_METADATA_THREADPOOL_IO_H_ + +#include +#include + +#include +#include + +typedef struct _MonoIOSelectorJob MonoIOSelectorJob; + +void +ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job); + +void +ves_icall_System_IOSelector_Remove (gpointer handle); + +void +mono_threadpool_io_remove_socket (int fd); +void +mono_threadpool_io_remove_domain_jobs (MonoDomain *domain); +void +mono_threadpool_io_cleanup (void); + +#endif /* _MONO_METADATA_THREADPOOL_IO_H_ */ diff --git a/mono/metadata/threadpool-ms-io-epoll.c b/mono/metadata/threadpool-ms-io-epoll.c deleted file mode 100644 index 2bc99e105b9..00000000000 --- a/mono/metadata/threadpool-ms-io-epoll.c +++ /dev/null @@ -1,130 +0,0 @@ - -#if defined(HAVE_EPOLL) - -#include - -#if defined(HOST_WIN32) -/* We assume that epoll is not available on windows */ -#error -#endif - -#define EPOLL_NEVENTS 128 - -static gint epoll_fd; -static struct epoll_event *epoll_events; - -static gboolean -epoll_init (gint wakeup_pipe_fd) -{ - struct epoll_event event; - -#ifdef EPOOL_CLOEXEC - epoll_fd = epoll_create1 (EPOLL_CLOEXEC); -#else - epoll_fd = epoll_create (256); - fcntl (epoll_fd, F_SETFD, FD_CLOEXEC); -#endif - - if (epoll_fd == -1) { -#ifdef EPOOL_CLOEXEC - g_error ("epoll_init: epoll (EPOLL_CLOEXEC) failed, error (%d) %s\n", errno, g_strerror (errno)); -#else - g_error ("epoll_init: epoll (256) failed, error (%d) %s\n", errno, g_strerror (errno)); -#endif - return FALSE; - } - - event.events = EPOLLIN; - event.data.fd = wakeup_pipe_fd; - if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event) == -1) { - g_error ("epoll_init: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno)); - close (epoll_fd); - return FALSE; - } - - epoll_events = g_new0 (struct epoll_event, EPOLL_NEVENTS); - - return TRUE; -} - -static void -epoll_register_fd (gint fd, gint events, gboolean is_new) -{ - struct epoll_event event; - -#ifndef EPOLLONESHOT -/* it was only defined on android in May 2013 */ -#define EPOLLONESHOT 0x40000000 -#endif - - event.data.fd = fd; - event.events = EPOLLONESHOT; - if ((events & EVENT_IN) != 0) - event.events |= EPOLLIN; - if ((events & EVENT_OUT) != 0) - event.events |= EPOLLOUT; - - if (epoll_ctl (epoll_fd, is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, event.data.fd, &event) == -1) - g_error ("epoll_register_fd: epoll_ctl(%s) failed, error (%d) %s", is_new ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD", errno, g_strerror (errno)); -} - -static void -epoll_remove_fd (gint fd) -{ - if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) - g_error ("epoll_remove_fd: epoll_ctl (EPOLL_CTL_DEL) failed, error (%d) %s", errno, g_strerror (errno)); -} - -static gint -epoll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) -{ - gint i, ready; - - memset (epoll_events, 0, sizeof (struct epoll_event) * EPOLL_NEVENTS); - - mono_gc_set_skip_thread (TRUE); - - MONO_ENTER_GC_SAFE; - ready = epoll_wait (epoll_fd, epoll_events, EPOLL_NEVENTS, -1); - MONO_EXIT_GC_SAFE; - - mono_gc_set_skip_thread (FALSE); - - if (ready == -1) { - switch (errno) { - case EINTR: - mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); - ready = 0; - break; - default: - g_error ("epoll_event_wait: epoll_wait () failed, error (%d) %s", errno, g_strerror (errno)); - break; - } - } - - if (ready == -1) - return -1; - - for (i = 0; i < ready; ++i) { - gint fd, events = 0; - - fd = epoll_events [i].data.fd; - if (epoll_events [i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) - events |= EVENT_IN; - if (epoll_events [i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) - events |= EVENT_OUT; - - callback (fd, events, user_data); - } - - return 0; -} - -static ThreadPoolIOBackend backend_epoll = { - .init = epoll_init, - .register_fd = epoll_register_fd, - .remove_fd = epoll_remove_fd, - .event_wait = epoll_event_wait, -}; - -#endif diff --git a/mono/metadata/threadpool-ms-io-kqueue.c b/mono/metadata/threadpool-ms-io-kqueue.c deleted file mode 100644 index 4422b668e8f..00000000000 --- a/mono/metadata/threadpool-ms-io-kqueue.c +++ /dev/null @@ -1,127 +0,0 @@ - -#if defined(HAVE_KQUEUE) - -#include -#include -#include - -#if defined(HOST_WIN32) -/* We assume that kqueue is not available on windows */ -#error -#endif - -#define KQUEUE_NEVENTS 128 - -static gint kqueue_fd; -static struct kevent *kqueue_events; - -static gint -KQUEUE_INIT_FD (gint fd, gint events, gint flags) -{ - struct kevent event; - EV_SET (&event, fd, events, flags, 0, 0, 0); - return kevent (kqueue_fd, &event, 1, NULL, 0, NULL); -} - -static gboolean -kqueue_init (gint wakeup_pipe_fd) -{ - kqueue_fd = kqueue (); - if (kqueue_fd == -1) { - g_error ("kqueue_init: kqueue () failed, error (%d) %s", errno, g_strerror (errno)); - return FALSE; - } - - if (KQUEUE_INIT_FD (wakeup_pipe_fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) { - g_error ("kqueue_init: kevent () failed, error (%d) %s", errno, g_strerror (errno)); - close (kqueue_fd); - return FALSE; - } - - kqueue_events = g_new0 (struct kevent, KQUEUE_NEVENTS); - - return TRUE; -} - -static void -kqueue_register_fd (gint fd, gint events, gboolean is_new) -{ - if (events & EVENT_IN) { - if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) - g_error ("kqueue_register_fd: kevent(read,enable) failed, error (%d) %s", errno, g_strerror (errno)); - } else { - if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_DISABLE) == -1) - g_error ("kqueue_register_fd: kevent(read,disable) failed, error (%d) %s", errno, g_strerror (errno)); - } - if (events & EVENT_OUT) { - if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_ENABLE) == -1) - g_error ("kqueue_register_fd: kevent(write,enable) failed, error (%d) %s", errno, g_strerror (errno)); - } else { - if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_DISABLE) == -1) - g_error ("kqueue_register_fd: kevent(write,disable) failed, error (%d) %s", errno, g_strerror (errno)); - } -} - -static void -kqueue_remove_fd (gint fd) -{ - /* FIXME: a race between closing and adding operation in the Socket managed code trigger a ENOENT error */ - if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_DELETE) == -1) - g_error ("kqueue_register_fd: kevent(read,delete) failed, error (%d) %s", errno, g_strerror (errno)); - if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_DELETE) == -1) - g_error ("kqueue_register_fd: kevent(write,delete) failed, error (%d) %s", errno, g_strerror (errno)); -} - -static gint -kqueue_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) -{ - gint i, ready; - - memset (kqueue_events, 0, sizeof (struct kevent) * KQUEUE_NEVENTS); - - mono_gc_set_skip_thread (TRUE); - - MONO_ENTER_GC_SAFE; - ready = kevent (kqueue_fd, NULL, 0, kqueue_events, KQUEUE_NEVENTS, NULL); - MONO_EXIT_GC_SAFE; - - mono_gc_set_skip_thread (FALSE); - - if (ready == -1) { - switch (errno) { - case EINTR: - mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); - ready = 0; - break; - default: - g_error ("kqueue_event_wait: kevent () failed, error (%d) %s", errno, g_strerror (errno)); - break; - } - } - - if (ready == -1) - return -1; - - for (i = 0; i < ready; ++i) { - gint fd, events = 0; - - fd = kqueue_events [i].ident; - if (kqueue_events [i].filter == EVFILT_READ || (kqueue_events [i].flags & EV_ERROR) != 0) - events |= EVENT_IN; - if (kqueue_events [i].filter == EVFILT_WRITE || (kqueue_events [i].flags & EV_ERROR) != 0) - events |= EVENT_OUT; - - callback (fd, events, user_data); - } - - return 0; -} - -static ThreadPoolIOBackend backend_kqueue = { - .init = kqueue_init, - .register_fd = kqueue_register_fd, - .remove_fd = kqueue_remove_fd, - .event_wait = kqueue_event_wait, -}; - -#endif diff --git a/mono/metadata/threadpool-ms-io-poll.c b/mono/metadata/threadpool-ms-io-poll.c deleted file mode 100644 index 3d32130d3b5..00000000000 --- a/mono/metadata/threadpool-ms-io-poll.c +++ /dev/null @@ -1,218 +0,0 @@ - -#include "utils/mono-poll.h" - -static mono_pollfd *poll_fds; -static guint poll_fds_capacity; -static guint poll_fds_size; - -static inline void -POLL_INIT_FD (mono_pollfd *poll_fd, gint fd, gint events) -{ - poll_fd->fd = fd; - poll_fd->events = events; - poll_fd->revents = 0; -} - -static gboolean -poll_init (gint wakeup_pipe_fd) -{ - g_assert (wakeup_pipe_fd >= 0); - - poll_fds_size = 1; - poll_fds_capacity = 64; - - poll_fds = g_new0 (mono_pollfd, poll_fds_capacity); - - POLL_INIT_FD (&poll_fds [0], wakeup_pipe_fd, MONO_POLLIN); - - return TRUE; -} - -static void -poll_register_fd (gint fd, gint events, gboolean is_new) -{ - gint i; - gint poll_event; - - g_assert (fd >= 0); - g_assert (poll_fds_size <= poll_fds_capacity); - - g_assert ((events & ~(EVENT_IN | EVENT_OUT)) == 0); - - poll_event = 0; - if (events & EVENT_IN) - poll_event |= MONO_POLLIN; - if (events & EVENT_OUT) - poll_event |= MONO_POLLOUT; - - for (i = 0; i < poll_fds_size; ++i) { - if (poll_fds [i].fd == fd) { - g_assert (!is_new); - POLL_INIT_FD (&poll_fds [i], fd, poll_event); - return; - } - } - - g_assert (is_new); - - for (i = 0; i < poll_fds_size; ++i) { - if (poll_fds [i].fd == -1) { - POLL_INIT_FD (&poll_fds [i], fd, poll_event); - return; - } - } - - poll_fds_size += 1; - - if (poll_fds_size > poll_fds_capacity) { - poll_fds_capacity *= 2; - g_assert (poll_fds_size <= poll_fds_capacity); - - poll_fds = (mono_pollfd *)g_renew (mono_pollfd, poll_fds, poll_fds_capacity); - } - - POLL_INIT_FD (&poll_fds [poll_fds_size - 1], fd, poll_event); -} - -static void -poll_remove_fd (gint fd) -{ - gint i; - - g_assert (fd >= 0); - - for (i = 0; i < poll_fds_size; ++i) { - if (poll_fds [i].fd == fd) { - POLL_INIT_FD (&poll_fds [i], -1, 0); - break; - } - } - - /* if we don't find the fd in poll_fds, - * it means we try to delete it twice */ - g_assert (i < poll_fds_size); - - /* if we find it again, it means we added - * it twice */ - for (; i < poll_fds_size; ++i) - g_assert (poll_fds [i].fd != fd); - - /* reduce the value of poll_fds_size so we - * do not keep it too big */ - while (poll_fds_size > 1 && poll_fds [poll_fds_size - 1].fd == -1) - poll_fds_size -= 1; -} - -static inline gint -poll_mark_bad_fds (mono_pollfd *poll_fds, gint poll_fds_size) -{ - gint i, ready = 0; - - for (i = 0; i < poll_fds_size; i++) { - if (poll_fds [i].fd == -1) - continue; - - switch (mono_poll (&poll_fds [i], 1, 0)) { - case 1: - ready++; - break; - case -1: - if (errno == EBADF) - { - poll_fds [i].revents |= MONO_POLLNVAL; - ready++; - } - break; - } - } - - return ready; -} - -static gint -poll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data) -{ - gint i, ready; - - for (i = 0; i < poll_fds_size; ++i) - poll_fds [i].revents = 0; - - mono_gc_set_skip_thread (TRUE); - - MONO_ENTER_GC_SAFE; - ready = mono_poll (poll_fds, poll_fds_size, -1); - MONO_EXIT_GC_SAFE; - - mono_gc_set_skip_thread (FALSE); - - if (ready == -1) { - /* - * Apart from EINTR, we only check EBADF, for the rest: - * EINVAL: mono_poll() 'protects' us from descriptor - * numbers above the limit if using select() by marking - * then as POLLERR. If a system poll() is being - * used, the number of descriptor we're passing will not - * be over sysconf(_SC_OPEN_MAX), as the error would have - * happened when opening. - * - * EFAULT: we own the memory pointed by pfds. - * ENOMEM: we're doomed anyway - * - */ - switch (errno) - { - case EINTR: - { - mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ()); - ready = 0; - break; - } - case EBADF: - { - ready = poll_mark_bad_fds (poll_fds, poll_fds_size); - break; - } - default: - g_error ("poll_event_wait: mono_poll () failed, error (%d) %s", errno, g_strerror (errno)); - break; - } - } - - if (ready == -1) - return -1; - if (ready == 0) - return 0; - - g_assert (ready > 0); - - for (i = 0; i < poll_fds_size; ++i) { - gint fd, events = 0; - - if (poll_fds [i].fd == -1) - continue; - if (poll_fds [i].revents == 0) - continue; - - fd = poll_fds [i].fd; - if (poll_fds [i].revents & (MONO_POLLIN | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) - events |= EVENT_IN; - if (poll_fds [i].revents & (MONO_POLLOUT | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) - events |= EVENT_OUT; - if (poll_fds [i].revents & (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) - events |= EVENT_ERR; - - callback (fd, events, user_data); - - if (--ready == 0) - break; - } - - return 0; -} - -static ThreadPoolIOBackend backend_poll = { - .init = poll_init, - .register_fd = poll_register_fd, - .remove_fd = poll_remove_fd, - .event_wait = poll_event_wait, -}; diff --git a/mono/metadata/threadpool-ms-io.c b/mono/metadata/threadpool-ms-io.c deleted file mode 100644 index 7cdaf5be684..00000000000 --- a/mono/metadata/threadpool-ms-io.c +++ /dev/null @@ -1,675 +0,0 @@ -/* - * threadpool-ms-io.c: Microsoft IO threadpool runtime support - * - * Author: - * Ludovic Henry (ludovic.henry@xamarin.com) - * - * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) - * Licensed under the MIT license. See LICENSE file in the project root for full license information. - */ - -#include - -#ifndef DISABLE_SOCKETS - -#include - -#if defined(HOST_WIN32) -#include -#else -#include -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include - -typedef struct { - gboolean (*init) (gint wakeup_pipe_fd); - void (*register_fd) (gint fd, gint events, gboolean is_new); - void (*remove_fd) (gint fd); - gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data); -} ThreadPoolIOBackend; - -/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */ -enum MonoIOOperation { - EVENT_IN = 1 << 0, - EVENT_OUT = 1 << 1, - EVENT_ERR = 1 << 2, /* not in managed */ -}; - -#include "threadpool-ms-io-epoll.c" -#include "threadpool-ms-io-kqueue.c" -#include "threadpool-ms-io-poll.c" - -#define UPDATES_CAPACITY 128 - -/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */ -struct _MonoIOSelectorJob { - MonoObject object; - gint32 operation; - MonoObject *callback; - MonoObject *state; -}; - -typedef enum { - UPDATE_EMPTY = 0, - UPDATE_ADD, - UPDATE_REMOVE_SOCKET, - UPDATE_REMOVE_DOMAIN, -} ThreadPoolIOUpdateType; - -typedef struct { - gint fd; - MonoIOSelectorJob *job; -} ThreadPoolIOUpdate_Add; - -typedef struct { - gint fd; -} ThreadPoolIOUpdate_RemoveSocket; - -typedef struct { - MonoDomain *domain; -} ThreadPoolIOUpdate_RemoveDomain; - -typedef struct { - ThreadPoolIOUpdateType type; - union { - ThreadPoolIOUpdate_Add add; - ThreadPoolIOUpdate_RemoveSocket remove_socket; - ThreadPoolIOUpdate_RemoveDomain remove_domain; - } data; -} ThreadPoolIOUpdate; - -typedef struct { - ThreadPoolIOBackend backend; - - ThreadPoolIOUpdate updates [UPDATES_CAPACITY]; - gint updates_size; - MonoCoopMutex updates_lock; - MonoCoopCond updates_cond; - -#if !defined(HOST_WIN32) - gint wakeup_pipes [2]; -#else - SOCKET wakeup_pipes [2]; -#endif -} ThreadPoolIO; - -static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; - -static gboolean io_selector_running = FALSE; - -static ThreadPoolIO* threadpool_io; - -static MonoIOSelectorJob* -get_job_for_event (MonoMList **list, gint32 event) -{ - MonoMList *current; - - g_assert (list); - - for (current = *list; current; current = mono_mlist_next (current)) { - MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current); - if (job->operation == event) { - *list = mono_mlist_remove_item (*list, current); - return job; - } - } - - return NULL; -} - -static gint -get_operations_for_jobs (MonoMList *list) -{ - MonoMList *current; - gint operations = 0; - - for (current = list; current; current = mono_mlist_next (current)) - operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation; - - return operations; -} - -static void -selector_thread_wakeup (void) -{ - gchar msg = 'c'; - gint written; - - for (;;) { -#if !defined(HOST_WIN32) - written = write (threadpool_io->wakeup_pipes [1], &msg, 1); - if (written == 1) - break; - if (written == -1) { - g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno)); - break; - } -#else - written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0); - if (written == 1) - break; - if (written == SOCKET_ERROR) { - g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ()); - break; - } -#endif - } -} - -static void -selector_thread_wakeup_drain_pipes (void) -{ - gchar buffer [128]; - gint received; - - for (;;) { -#if !defined(HOST_WIN32) - received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer)); - if (received == 0) - break; - if (received == -1) { - if (errno != EINTR && errno != EAGAIN) - g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno)); - break; - } -#else - received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0); - if (received == 0) - break; - if (received == SOCKET_ERROR) { - if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK) - g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ()); - break; - } -#endif - } -} - -typedef struct { - MonoDomain *domain; - MonoGHashTable *states; -} FilterSockaresForDomainData; - -static void -filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data) -{ - FilterSockaresForDomainData *data; - MonoMList *list = (MonoMList *)value, *element; - MonoDomain *domain; - MonoGHashTable *states; - - g_assert (user_data); - data = (FilterSockaresForDomainData *)user_data; - domain = data->domain; - states = data->states; - - for (element = list; element; element = mono_mlist_next (element)) { - MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element); - if (mono_object_domain (job) == domain) - mono_mlist_set_data (element, NULL); - } - - /* we skip all the first elements which are NULL */ - for (; list; list = mono_mlist_next (list)) { - if (mono_mlist_get_data (list)) - break; - } - - if (list) { - g_assert (mono_mlist_get_data (list)); - - /* we delete all the NULL elements after the first one */ - for (element = list; element;) { - MonoMList *next; - if (!(next = mono_mlist_next (element))) - break; - if (mono_mlist_get_data (next)) - element = next; - else - mono_mlist_set_next (element, mono_mlist_next (next)); - } - } - - mono_g_hash_table_replace (states, key, list); -} - -static void -wait_callback (gint fd, gint events, gpointer user_data) -{ - MonoError error; - - if (mono_runtime_is_shutting_down ()) - return; - - if (fd == threadpool_io->wakeup_pipes [0]) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke"); - selector_thread_wakeup_drain_pipes (); - } else { - MonoGHashTable *states; - MonoMList *list = NULL; - gpointer k; - gboolean remove_fd = FALSE; - gint operations; - - g_assert (user_data); - states = (MonoGHashTable *)user_data; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s", - fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "..."); - - if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) - g_error ("wait_callback: fd %d not found in states table", fd); - - if (list && (events & EVENT_IN) != 0) { - MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN); - if (job) { - mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); - mono_error_assert_ok (&error); - } - - } - if (list && (events & EVENT_OUT) != 0) { - MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT); - if (job) { - mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error); - mono_error_assert_ok (&error); - } - } - - remove_fd = (events & EVENT_ERR) == EVENT_ERR; - if (!remove_fd) { - mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list); - - operations = get_operations_for_jobs (list); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s", - fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "..."); - - threadpool_io->backend.register_fd (fd, operations, FALSE); - } else { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd); - - mono_g_hash_table_remove (states, GINT_TO_POINTER (fd)); - - threadpool_io->backend.remove_fd (fd); - } - } -} - -static void -selector_thread (gpointer data) -{ - MonoError error; - MonoGHashTable *states; - - io_selector_running = TRUE; - - if (mono_runtime_is_shutting_down ()) { - io_selector_running = FALSE; - return; - } - - states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table"); - - for (;;) { - gint i, j; - gint res; - - mono_coop_mutex_lock (&threadpool_io->updates_lock); - - for (i = 0; i < threadpool_io->updates_size; ++i) { - ThreadPoolIOUpdate *update = &threadpool_io->updates [i]; - - switch (update->type) { - case UPDATE_EMPTY: - break; - case UPDATE_ADD: { - gint fd; - gint operations; - gpointer k; - gboolean exists; - MonoMList *list = NULL; - MonoIOSelectorJob *job; - - fd = update->data.add.fd; - g_assert (fd >= 0); - - job = update->data.add.job; - g_assert (job); - - exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list); - list = mono_mlist_append_checked (list, (MonoObject*) job, &error); - mono_error_assert_ok (&error); - mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list); - - operations = get_operations_for_jobs (list); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s", - exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "..."); - - threadpool_io->backend.register_fd (fd, operations, !exists); - - break; - } - case UPDATE_REMOVE_SOCKET: { - gint fd; - gpointer k; - MonoMList *list = NULL; - - fd = update->data.remove_socket.fd; - g_assert (fd >= 0); - - if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) { - mono_g_hash_table_remove (states, GINT_TO_POINTER (fd)); - - for (j = i + 1; j < threadpool_io->updates_size; ++j) { - ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; - if (update->type == UPDATE_ADD && update->data.add.fd == fd) - memset (update, 0, sizeof (ThreadPoolIOUpdate)); - } - - for (; list; list = mono_mlist_remove_item (list, list)) { - mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error); - mono_error_assert_ok (&error); - } - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd); - threadpool_io->backend.remove_fd (fd); - } - - break; - } - case UPDATE_REMOVE_DOMAIN: { - MonoDomain *domain; - - domain = update->data.remove_domain.domain; - g_assert (domain); - - FilterSockaresForDomainData user_data = { .domain = domain, .states = states }; - mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data); - - for (j = i + 1; j < threadpool_io->updates_size; ++j) { - ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; - if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain) - memset (update, 0, sizeof (ThreadPoolIOUpdate)); - } - - break; - } - default: - g_assert_not_reached (); - } - } - - mono_coop_cond_broadcast (&threadpool_io->updates_cond); - - if (threadpool_io->updates_size > 0) { - threadpool_io->updates_size = 0; - memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate)); - } - - mono_coop_mutex_unlock (&threadpool_io->updates_lock); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai"); - - res = threadpool_io->backend.event_wait (wait_callback, states); - - if (res == -1 || mono_runtime_is_shutting_down ()) - break; - } - - mono_g_hash_table_destroy (states); - - io_selector_running = FALSE; -} - -/* Locking: threadpool_io->updates_lock must be held */ -static ThreadPoolIOUpdate* -update_get_new (void) -{ - ThreadPoolIOUpdate *update = NULL; - g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY); - - while (threadpool_io->updates_size == UPDATES_CAPACITY) { - /* we wait for updates to be applied in the selector_thread and we loop - * as long as none are available. if it happends too much, then we need - * to increase UPDATES_CAPACITY */ - mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); - } - - g_assert (threadpool_io->updates_size < UPDATES_CAPACITY); - - update = &threadpool_io->updates [threadpool_io->updates_size ++]; - - return update; -} - -static void -wakeup_pipes_init (void) -{ -#if !defined(HOST_WIN32) - if (pipe (threadpool_io->wakeup_pipes) == -1) - g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno)); - if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1) - g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno)); -#else - struct sockaddr_in client; - struct sockaddr_in server; - SOCKET server_sock; - gulong arg; - gint size; - - server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - g_assert (server_sock != INVALID_SOCKET); - threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET); - - server.sin_family = AF_INET; - server.sin_addr.s_addr = inet_addr ("127.0.0.1"); - server.sin_port = 0; - if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) { - closesocket (server_sock); - g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ()); - } - - size = sizeof (server); - if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) { - closesocket (server_sock); - g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ()); - } - if (listen (server_sock, 1024) == SOCKET_ERROR) { - closesocket (server_sock); - g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ()); - } - if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) { - closesocket (server_sock); - g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ()); - } - - size = sizeof (client); - threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size); - g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET); - - arg = 1; - if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) { - closesocket (threadpool_io->wakeup_pipes [0]); - closesocket (server_sock); - g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ()); - } - - closesocket (server_sock); -#endif -} - -static void -initialize (void) -{ - g_assert (!threadpool_io); - threadpool_io = g_new0 (ThreadPoolIO, 1); - g_assert (threadpool_io); - - mono_coop_mutex_init (&threadpool_io->updates_lock); - mono_coop_cond_init (&threadpool_io->updates_cond); - mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list"); - - threadpool_io->updates_size = 0; - - threadpool_io->backend = backend_poll; - if (g_getenv ("MONO_ENABLE_AIO") != NULL) { -#if defined(HAVE_EPOLL) - threadpool_io->backend = backend_epoll; -#elif defined(HAVE_KQUEUE) - threadpool_io->backend = backend_kqueue; -#endif - } - - wakeup_pipes_init (); - - if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0])) - g_error ("initialize: backend->init () failed"); - - MonoError error; - if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error)) - g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error)); -} - -static void -cleanup (void) -{ - /* we make the assumption along the code that we are - * cleaning up only if the runtime is shutting down */ - g_assert (mono_runtime_is_shutting_down ()); - - selector_thread_wakeup (); - while (io_selector_running) - mono_thread_info_usleep (1000); -} - -void -mono_threadpool_ms_io_cleanup (void) -{ - mono_lazy_cleanup (&io_status, cleanup); -} - -void -ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job) -{ - ThreadPoolIOUpdate *update; - - g_assert (handle); - - g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT)); - g_assert (job->callback); - - if (mono_runtime_is_shutting_down ()) - return; - if (mono_domain_is_unloading (mono_object_domain (job))) - return; - - mono_lazy_initialize (&io_status, initialize); - - mono_coop_mutex_lock (&threadpool_io->updates_lock); - - update = update_get_new (); - update->type = UPDATE_ADD; - update->data.add.fd = GPOINTER_TO_INT (handle); - update->data.add.job = job; - mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - - selector_thread_wakeup (); - - mono_coop_mutex_unlock (&threadpool_io->updates_lock); -} - -void -ves_icall_System_IOSelector_Remove (gpointer handle) -{ - mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle)); -} - -void -mono_threadpool_ms_io_remove_socket (int fd) -{ - ThreadPoolIOUpdate *update; - - if (!mono_lazy_is_initialized (&io_status)) - return; - - mono_coop_mutex_lock (&threadpool_io->updates_lock); - - update = update_get_new (); - update->type = UPDATE_REMOVE_SOCKET; - update->data.add.fd = fd; - mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - - selector_thread_wakeup (); - - mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); - - mono_coop_mutex_unlock (&threadpool_io->updates_lock); -} - -void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain) -{ - ThreadPoolIOUpdate *update; - - if (!mono_lazy_is_initialized (&io_status)) - return; - - mono_coop_mutex_lock (&threadpool_io->updates_lock); - - update = update_get_new (); - update->type = UPDATE_REMOVE_DOMAIN; - update->data.remove_domain.domain = domain; - mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - - selector_thread_wakeup (); - - mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); - - mono_coop_mutex_unlock (&threadpool_io->updates_lock); -} - -#else - -void -ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job) -{ - g_assert_not_reached (); -} - -void -ves_icall_System_IOSelector_Remove (gpointer handle) -{ - g_assert_not_reached (); -} - -void -mono_threadpool_ms_io_cleanup (void) -{ - g_assert_not_reached (); -} - -void -mono_threadpool_ms_io_remove_socket (int fd) -{ - g_assert_not_reached (); -} - -void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain) -{ - g_assert_not_reached (); -} - -#endif diff --git a/mono/metadata/threadpool-ms-io.h b/mono/metadata/threadpool-ms-io.h deleted file mode 100644 index 106be80a1da..00000000000 --- a/mono/metadata/threadpool-ms-io.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef _MONO_THREADPOOL_MS_IO_H_ -#define _MONO_THREADPOOL_MS_IO_H_ - -#include -#include - -#include -#include - -typedef struct _MonoIOSelectorJob MonoIOSelectorJob; - -void -ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job); - -void -ves_icall_System_IOSelector_Remove (gpointer handle); - -void -mono_threadpool_ms_io_remove_socket (int fd); -void -mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain); -void -mono_threadpool_ms_io_cleanup (void); - -#endif /* _MONO_THREADPOOL_MS_IO_H_ */ diff --git a/mono/metadata/threadpool-ms.c b/mono/metadata/threadpool-ms.c deleted file mode 100644 index aa8289cd51d..00000000000 --- a/mono/metadata/threadpool-ms.c +++ /dev/null @@ -1,1695 +0,0 @@ -/* - * threadpool-ms.c: Microsoft threadpool runtime support - * - * Author: - * Ludovic Henry (ludovic.henry@xamarin.com) - * - * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) - * Licensed under the MIT license. See LICENSE file in the project root for full license information. - */ - -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. -// -// Files: -// - src/vm/comthreadpool.cpp -// - src/vm/win32threadpoolcpp -// - src/vm/threadpoolrequest.cpp -// - src/vm/hillclimbing.cpp -// -// Ported from C++ to C and adjusted to Mono runtime - -#include -#define _USE_MATH_DEFINES // needed by MSVC to define math constants -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define CPU_USAGE_LOW 80 -#define CPU_USAGE_HIGH 95 - -#define MONITOR_INTERVAL 500 // ms -#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms - -#define WORKER_CREATION_MAX_PER_SEC 10 - -/* The exponent to apply to the gain. 1.0 means to use linear gain, - * higher values will enhance large moves and damp small ones. - * default: 2.0 */ -#define HILL_CLIMBING_GAIN_EXPONENT 2.0 - -/* The 'cost' of a thread. 0 means drive for increased throughput regardless - * of thread count, higher values bias more against higher thread counts. - * default: 0.15 */ -#define HILL_CLIMBING_BIAS 0.15 - -#define HILL_CLIMBING_WAVE_PERIOD 4 -#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20 -#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0 -#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8 -#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0 -#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4 -#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20 -#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10 -#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200 -#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01 -#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15 - -typedef union { - struct { - gint16 max_working; /* determined by heuristic */ - gint16 active; /* executing worker_thread */ - gint16 working; /* actively executing worker_thread, not parked */ - gint16 parked; /* parked */ - } _; - gint64 as_gint64; -} ThreadPoolCounter; - -typedef struct { - MonoDomain *domain; - /* Number of outstanding jobs */ - gint32 outstanding_request; - /* Number of currently executing jobs */ - int threadpool_jobs; - /* Signalled when threadpool_jobs + outstanding_request is 0 */ - /* Protected by threadpool->domains_lock */ - MonoCoopCond cleanup_cond; -} ThreadPoolDomain; - -typedef MonoInternalThread ThreadPoolWorkingThread; - -typedef struct { - gint32 wave_period; - gint32 samples_to_measure; - gdouble target_throughput_ratio; - gdouble target_signal_to_noise_ratio; - gdouble max_change_per_second; - gdouble max_change_per_sample; - gint32 max_thread_wave_magnitude; - gint32 sample_interval_low; - gdouble thread_magnitude_multiplier; - gint32 sample_interval_high; - gdouble throughput_error_smoothing_factor; - gdouble gain_exponent; - gdouble max_sample_error; - - gdouble current_control_setting; - gint64 total_samples; - gint16 last_thread_count; - gdouble elapsed_since_last_change; - gdouble completions_since_last_change; - - gdouble average_throughput_noise; - - gdouble *samples; - gdouble *thread_counts; - - guint32 current_sample_interval; - gpointer random_interval_generator; - - gint32 accumulated_completion_count; - gdouble accumulated_sample_duration; -} ThreadPoolHillClimbing; - -typedef struct { - ThreadPoolCounter counters; - - GPtrArray *domains; // ThreadPoolDomain* [] - MonoCoopMutex domains_lock; - - GPtrArray *working_threads; // ThreadPoolWorkingThread* [] - gint32 parked_threads_count; - MonoCoopCond parked_threads_cond; - MonoCoopMutex active_threads_lock; /* protect access to working_threads and parked_threads */ - - guint32 worker_creation_current_second; - guint32 worker_creation_current_count; - MonoCoopMutex worker_creation_lock; - - gint32 heuristic_completions; - gint64 heuristic_sample_start; - gint64 heuristic_last_dequeue; // ms - gint64 heuristic_last_adjustment; // ms - gint64 heuristic_adjustment_interval; // ms - ThreadPoolHillClimbing heuristic_hill_climbing; - MonoCoopMutex heuristic_lock; - - gint32 limit_worker_min; - gint32 limit_worker_max; - gint32 limit_io_min; - gint32 limit_io_max; - - MonoCpuUsageState *cpu_usage_state; - gint32 cpu_usage; - - /* suspended by the debugger */ - gboolean suspended; -} ThreadPool; - -typedef enum { - TRANSITION_WARMUP, - TRANSITION_INITIALIZING, - TRANSITION_RANDOM_MOVE, - TRANSITION_CLIMBING_MOVE, - TRANSITION_CHANGE_POINT, - TRANSITION_STABILIZING, - TRANSITION_STARVATION, - TRANSITION_THREAD_TIMED_OUT, - TRANSITION_UNDEFINED, -} ThreadPoolHeuristicStateTransition; - -static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; - -enum { - MONITOR_STATUS_REQUESTED, - MONITOR_STATUS_WAITING_FOR_REQUEST, - MONITOR_STATUS_NOT_RUNNING, -}; - -static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING; - -static ThreadPool* threadpool; - -#define COUNTER_CHECK(counter) \ - do { \ - g_assert (counter._.max_working > 0); \ - g_assert (counter._.working >= 0); \ - g_assert (counter._.active >= 0); \ - } while (0) - -#define COUNTER_READ() (InterlockedRead64 (&threadpool->counters.as_gint64)) - -#define COUNTER_ATOMIC(var,block) \ - do { \ - ThreadPoolCounter __old; \ - do { \ - g_assert (threadpool); \ - __old.as_gint64 = COUNTER_READ (); \ - (var) = __old; \ - { block; } \ - COUNTER_CHECK (var); \ - } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ - } while (0) - -#define COUNTER_TRY_ATOMIC(res,var,block) \ - do { \ - ThreadPoolCounter __old; \ - do { \ - g_assert (threadpool); \ - __old.as_gint64 = COUNTER_READ (); \ - (var) = __old; \ - (res) = FALSE; \ - { block; } \ - COUNTER_CHECK (var); \ - (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \ - } while (0); \ - } while (0) - -static inline void -domains_lock (void) -{ - mono_coop_mutex_lock (&threadpool->domains_lock); -} - -static inline void -domains_unlock (void) -{ - mono_coop_mutex_unlock (&threadpool->domains_lock); -} - -static gpointer -rand_create (void) -{ - mono_rand_open (); - return mono_rand_init (NULL, 0); -} - -static guint32 -rand_next (gpointer *handle, guint32 min, guint32 max) -{ - MonoError error; - guint32 val; - mono_rand_try_get_uint32 (handle, &val, min, max, &error); - // FIXME handle error - mono_error_assert_ok (&error); - return val; -} - -static void -rand_free (gpointer handle) -{ - mono_rand_close (handle); -} - -static void -initialize (void) -{ - ThreadPoolHillClimbing *hc; - const char *threads_per_cpu_env; - gint threads_per_cpu; - gint threads_count; - - g_assert (!threadpool); - threadpool = g_new0 (ThreadPool, 1); - g_assert (threadpool); - - threadpool->domains = g_ptr_array_new (); - mono_coop_mutex_init (&threadpool->domains_lock); - - threadpool->parked_threads_count = 0; - mono_coop_cond_init (&threadpool->parked_threads_cond); - threadpool->working_threads = g_ptr_array_new (); - mono_coop_mutex_init (&threadpool->active_threads_lock); - - threadpool->worker_creation_current_second = -1; - mono_coop_mutex_init (&threadpool->worker_creation_lock); - - threadpool->heuristic_adjustment_interval = 10; - mono_coop_mutex_init (&threadpool->heuristic_lock); - - mono_rand_open (); - - hc = &threadpool->heuristic_hill_climbing; - - hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; - hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; - hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER; - hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE; - hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS; - hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO; - hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND; - hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE; - hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW; - hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH; - hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR; - hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT; - hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT; - hc->current_control_setting = 0; - hc->total_samples = 0; - hc->last_thread_count = 0; - hc->average_throughput_noise = 0; - hc->elapsed_since_last_change = 0; - hc->accumulated_completion_count = 0; - hc->accumulated_sample_duration = 0; - hc->samples = g_new0 (gdouble, hc->samples_to_measure); - hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure); - hc->random_interval_generator = rand_create (); - hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); - - if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU"))) - threads_per_cpu = 1; - else - threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50); - - threads_count = mono_cpu_count () * threads_per_cpu; - - threadpool->limit_worker_min = threadpool->limit_io_min = threads_count; - -#if defined (PLATFORM_ANDROID) || defined (HOST_IOS) - threadpool->limit_worker_max = threadpool->limit_io_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); -#else - threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100; -#endif - - threadpool->counters._.max_working = threadpool->limit_worker_min; - - threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1); - - threadpool->suspended = FALSE; -} - -static void worker_kill (ThreadPoolWorkingThread *thread); - -static void -cleanup (void) -{ - guint i; - - /* we make the assumption along the code that we are - * cleaning up only if the runtime is shutting down */ - g_assert (mono_runtime_is_shutting_down ()); - - while (monitor_status != MONITOR_STATUS_NOT_RUNNING) - mono_thread_info_sleep (1, NULL); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - - /* stop all threadpool->working_threads */ - for (i = 0; i < threadpool->working_threads->len; ++i) - worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i)); - - /* unpark all threadpool->parked_threads */ - mono_coop_cond_broadcast (&threadpool->parked_threads_cond); - - mono_coop_mutex_unlock (&threadpool->active_threads_lock); -} - -gboolean -mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error) -{ - static MonoClass *threadpool_class = NULL; - static MonoMethod *unsafe_queue_custom_work_item_method = NULL; - MonoDomain *current_domain; - MonoBoolean f; - gpointer args [2]; - - mono_error_init (error); - g_assert (work_item); - - if (!threadpool_class) - threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool"); - - if (!unsafe_queue_custom_work_item_method) - unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2); - g_assert (unsafe_queue_custom_work_item_method); - - f = FALSE; - - args [0] = (gpointer) work_item; - args [1] = (gpointer) &f; - - current_domain = mono_domain_get (); - if (current_domain == domain) { - mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); - return_val_if_nok (error, FALSE); - } else { - mono_thread_push_appdomain_ref (domain); - if (mono_domain_set (domain, FALSE)) { - mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); - if (!is_ok (error)) { - mono_thread_pop_appdomain_ref (); - return FALSE; - } - mono_domain_set (current_domain, TRUE); - } - mono_thread_pop_appdomain_ref (); - } - return TRUE; -} - -/* LOCKING: domains_lock must be held */ -static void -tpdomain_add (ThreadPoolDomain *tpdomain) -{ - guint i, len; - - g_assert (tpdomain); - - len = threadpool->domains->len; - for (i = 0; i < len; ++i) { - if (g_ptr_array_index (threadpool->domains, i) == tpdomain) - break; - } - - if (i == len) - g_ptr_array_add (threadpool->domains, tpdomain); -} - -/* LOCKING: domains_lock must be held. */ -static gboolean -tpdomain_remove (ThreadPoolDomain *tpdomain) -{ - g_assert (tpdomain); - return g_ptr_array_remove (threadpool->domains, tpdomain); -} - -/* LOCKING: domains_lock must be held */ -static ThreadPoolDomain * -tpdomain_get (MonoDomain *domain, gboolean create) -{ - guint i; - ThreadPoolDomain *tpdomain; - - g_assert (domain); - - for (i = 0; i < threadpool->domains->len; ++i) { - ThreadPoolDomain *tpdomain; - - tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); - if (tpdomain->domain == domain) - return tpdomain; - } - - if (!create) - return NULL; - - tpdomain = g_new0 (ThreadPoolDomain, 1); - tpdomain->domain = domain; - mono_coop_cond_init (&tpdomain->cleanup_cond); - - tpdomain_add (tpdomain); - - return tpdomain; -} - -static void -tpdomain_free (ThreadPoolDomain *tpdomain) -{ - g_free (tpdomain); -} - -/* LOCKING: domains_lock must be held */ -static gboolean -domain_any_has_request (void) -{ - guint i; - - for (i = 0; i < threadpool->domains->len; ++i) { - ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); - if (tmp->outstanding_request > 0) - return TRUE; - } - - return FALSE; -} - -/* LOCKING: domains_lock must be held */ -static ThreadPoolDomain * -tpdomain_get_next (ThreadPoolDomain *current) -{ - ThreadPoolDomain *tpdomain = NULL; - guint len; - - len = threadpool->domains->len; - if (len > 0) { - guint i, current_idx = -1; - if (current) { - for (i = 0; i < len; ++i) { - if (current == g_ptr_array_index (threadpool->domains, i)) { - current_idx = i; - break; - } - } - g_assert (current_idx != (guint)-1); - } - for (i = current_idx + 1; i < len + current_idx + 1; ++i) { - ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len); - if (tmp->outstanding_request > 0) { - tpdomain = tmp; - break; - } - } - } - - return tpdomain; -} - -static void -worker_wait_interrupt (gpointer data) -{ - mono_coop_mutex_lock (&threadpool->active_threads_lock); - mono_coop_cond_signal (&threadpool->parked_threads_cond); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); -} - -/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */ -static gboolean -worker_park (void) -{ - gboolean timeout = FALSE; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ()); - - mono_gc_set_skip_thread (TRUE); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - - if (!mono_runtime_is_shutting_down ()) { - static gpointer rand_handle = NULL; - MonoInternalThread *thread_internal; - gboolean interrupted = FALSE; - - if (!rand_handle) - rand_handle = rand_create (); - g_assert (rand_handle); - - thread_internal = mono_thread_internal_current (); - g_assert (thread_internal); - - threadpool->parked_threads_count += 1; - g_ptr_array_remove_fast (threadpool->working_threads, thread_internal); - - mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted); - if (interrupted) - goto done; - - if (mono_coop_cond_timedwait (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) - timeout = TRUE; - - mono_thread_info_uninstall_interrupt (&interrupted); - -done: - g_ptr_array_add (threadpool->working_threads, thread_internal); - threadpool->parked_threads_count -= 1; - } - - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - mono_gc_set_skip_thread (FALSE); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no"); - - return timeout; -} - -static gboolean -worker_try_unpark (void) -{ - gboolean res = FALSE; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - if (threadpool->parked_threads_count > 0) { - mono_coop_cond_signal (&threadpool->parked_threads_cond); - res = TRUE; - } - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); - - return res; -} - -static void -worker_kill (ThreadPoolWorkingThread *thread) -{ - if (thread == mono_thread_internal_current ()) - return; - - mono_thread_internal_abort ((MonoInternalThread*) thread); -} - -static void -worker_thread (gpointer data) -{ - MonoError error; - MonoInternalThread *thread; - ThreadPoolDomain *tpdomain, *previous_tpdomain; - ThreadPoolCounter counter; - gboolean retire = FALSE; - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); - - g_assert (threadpool); - - thread = mono_thread_internal_current (); - g_assert (thread); - - mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error); - mono_error_assert_ok (&error); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - g_ptr_array_add (threadpool->working_threads, thread); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - previous_tpdomain = NULL; - - domains_lock (); - - while (!mono_runtime_is_shutting_down ()) { - tpdomain = NULL; - - if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) { - domains_unlock (); - mono_thread_interruption_checkpoint (); - domains_lock (); - } - - if (retire || !(tpdomain = tpdomain_get_next (previous_tpdomain))) { - gboolean timeout; - - COUNTER_ATOMIC (counter, { - counter._.working --; - counter._.parked ++; - }); - - domains_unlock (); - timeout = worker_park (); - domains_lock (); - - COUNTER_ATOMIC (counter, { - counter._.working ++; - counter._.parked --; - }); - - if (timeout) - break; - - if (retire) - retire = FALSE; - - /* The tpdomain->domain might have unloaded, while this thread was parked */ - previous_tpdomain = NULL; - - continue; - } - - tpdomain->outstanding_request --; - g_assert (tpdomain->outstanding_request >= 0); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ", - mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); - - g_assert (tpdomain->domain); - g_assert (tpdomain->threadpool_jobs >= 0); - tpdomain->threadpool_jobs ++; - - /* - * This is needed so there is always an lmf frame in the runtime invoke call below, - * so ThreadAbortExceptions are caught even if the thread is in native code. - */ - mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE; - - domains_unlock (); - - mono_thread_push_appdomain_ref (tpdomain->domain); - if (mono_domain_set (tpdomain->domain, FALSE)) { - MonoObject *exc = NULL, *res; - - res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error); - if (exc || !mono_error_ok(&error)) { - if (exc == NULL) - exc = (MonoObject *) mono_error_convert_to_exception (&error); - else - mono_error_cleanup (&error); - mono_thread_internal_unhandled_exception (exc); - } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE) - retire = TRUE; - - mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background); - if (!mono_thread_test_state (thread , ThreadState_Background)) - ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); - - mono_domain_set (mono_get_root_domain (), TRUE); - } - mono_thread_pop_appdomain_ref (); - - domains_lock (); - - tpdomain->threadpool_jobs --; - g_assert (tpdomain->threadpool_jobs >= 0); - - if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) { - gboolean removed; - - removed = tpdomain_remove (tpdomain); - g_assert (removed); - - mono_coop_cond_signal (&tpdomain->cleanup_cond); - tpdomain = NULL; - } - - previous_tpdomain = tpdomain; - } - - domains_unlock (); - - mono_coop_mutex_lock (&threadpool->active_threads_lock); - g_ptr_array_remove_fast (threadpool->working_threads, thread); - mono_coop_mutex_unlock (&threadpool->active_threads_lock); - - COUNTER_ATOMIC (counter, { - counter._.working--; - counter._.active --; - }); - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); -} - -static gboolean -worker_try_create (void) -{ - ThreadPoolCounter counter; - MonoInternalThread *thread; - gint64 current_ticks; - gint32 now; - - mono_coop_mutex_lock (&threadpool->worker_creation_lock); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); - current_ticks = mono_100ns_ticks (); - now = current_ticks / (10 * 1000 * 1000); - if (0 == current_ticks) { - g_warning ("failed to get 100ns ticks"); - } else { - if (threadpool->worker_creation_current_second != now) { - threadpool->worker_creation_current_second = now; - threadpool->worker_creation_current_count = 0; - } else { - g_assert (threadpool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); - if (threadpool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", - mono_native_thread_id_get (), threadpool->worker_creation_current_count); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; - } - } - } - - COUNTER_ATOMIC (counter, { - if (counter._.working >= counter._.max_working) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", - mono_native_thread_id_get ()); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; - } - counter._.working ++; - counter._.active ++; - }); - - MonoError error; - if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) { - threadpool->worker_creation_current_count += 1; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count); - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return TRUE; - } - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); - mono_error_cleanup (&error); - - COUNTER_ATOMIC (counter, { - counter._.working --; - counter._.active --; - }); - - mono_coop_mutex_unlock (&threadpool->worker_creation_lock); - return FALSE; -} - -static void monitor_ensure_running (void); - -static gboolean -worker_request (MonoDomain *domain) -{ - ThreadPoolDomain *tpdomain; - - g_assert (domain); - g_assert (threadpool); - - if (mono_runtime_is_shutting_down ()) - return FALSE; - - domains_lock (); - - /* synchronize check with worker_thread */ - if (mono_domain_is_unloading (domain)) { - domains_unlock (); - return FALSE; - } - - tpdomain = tpdomain_get (domain, TRUE); - g_assert (tpdomain); - tpdomain->outstanding_request ++; - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d", - mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); - - domains_unlock (); - - if (threadpool->suspended) - return FALSE; - - monitor_ensure_running (); - - if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); - return TRUE; - } - - if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); - return TRUE; - } - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); - return FALSE; -} - -static gboolean -monitor_should_keep_running (void) -{ - static gint64 last_should_keep_running = -1; - - g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED); - - if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { - gboolean should_keep_running = TRUE, force_should_keep_running = FALSE; - - if (mono_runtime_is_shutting_down ()) { - should_keep_running = FALSE; - } else { - domains_lock (); - if (!domain_any_has_request ()) - should_keep_running = FALSE; - domains_unlock (); - - if (!should_keep_running) { - if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) { - should_keep_running = force_should_keep_running = TRUE; - } - } - } - - if (should_keep_running) { - if (last_should_keep_running == -1 || !force_should_keep_running) - last_should_keep_running = mono_100ns_ticks (); - } else { - last_should_keep_running = -1; - if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) - return FALSE; - } - } - - g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED); - - return TRUE; -} - -static gboolean -monitor_sufficient_delay_since_last_dequeue (void) -{ - gint64 threshold; - - g_assert (threadpool); - - if (threadpool->cpu_usage < CPU_USAGE_LOW) { - threshold = MONITOR_INTERVAL; - } else { - ThreadPoolCounter counter; - counter.as_gint64 = COUNTER_READ(); - threshold = counter._.max_working * MONITOR_INTERVAL * 2; - } - - return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold; -} - -static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); - -static void -monitor_thread (void) -{ - MonoInternalThread *current_thread = mono_thread_internal_current (); - guint i; - - mono_cpu_usage (threadpool->cpu_usage_state); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ()); - - do { - ThreadPoolCounter counter; - gboolean limit_worker_max_reached; - gint32 interval_left = MONITOR_INTERVAL; - gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */ - - g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING); - - mono_gc_set_skip_thread (TRUE); - - do { - gint64 ts; - gboolean alerted = FALSE; - - if (mono_runtime_is_shutting_down ()) - break; - - ts = mono_msec_ticks (); - if (mono_thread_info_sleep (interval_left, &alerted) == 0) - break; - interval_left -= mono_msec_ticks () - ts; - - mono_gc_set_skip_thread (FALSE); - if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) - mono_thread_interruption_checkpoint (); - mono_gc_set_skip_thread (TRUE); - } while (interval_left > 0 && ++awake < 10); - - mono_gc_set_skip_thread (FALSE); - - if (threadpool->suspended) - continue; - - if (mono_runtime_is_shutting_down ()) - continue; - - domains_lock (); - if (!domain_any_has_request ()) { - domains_unlock (); - continue; - } - domains_unlock (); - - threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state); - - if (!monitor_sufficient_delay_since_last_dequeue ()) - continue; - - limit_worker_max_reached = FALSE; - - COUNTER_ATOMIC (counter, { - if (counter._.max_working >= threadpool->limit_worker_max) { - limit_worker_max_reached = TRUE; - break; - } - counter._.max_working ++; - }); - - if (limit_worker_max_reached) - continue; - - hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION); - - for (i = 0; i < 5; ++i) { - if (mono_runtime_is_shutting_down ()) - break; - - if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); - break; - } - - if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); - break; - } - } - } while (monitor_should_keep_running ()); - - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ()); -} - -static void -monitor_ensure_running (void) -{ - MonoError error; - for (;;) { - switch (monitor_status) { - case MONITOR_STATUS_REQUESTED: - return; - case MONITOR_STATUS_WAITING_FOR_REQUEST: - InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); - break; - case MONITOR_STATUS_NOT_RUNNING: - if (mono_runtime_is_shutting_down ()) - return; - if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { - if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK, &error)) { - monitor_status = MONITOR_STATUS_NOT_RUNNING; - mono_error_cleanup (&error); - } - return; - } - break; - default: g_assert_not_reached (); - } - } -} - -static void -hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) -{ - ThreadPoolHillClimbing *hc; - - g_assert (threadpool); - - hc = &threadpool->heuristic_hill_climbing; - - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); - - hc->last_thread_count = new_thread_count; - hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); - hc->elapsed_since_last_change = 0; - hc->completions_since_last_change = 0; -} - -static void -hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) -{ - ThreadPoolHillClimbing *hc; - - g_assert (threadpool); - - hc = &threadpool->heuristic_hill_climbing; - - if (new_thread_count != hc->last_thread_count) { - hc->current_control_setting += new_thread_count - hc->last_thread_count; - hill_climbing_change_thread_count (new_thread_count, transition); - } -} - -static double_complex -hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period) -{ - ThreadPoolHillClimbing *hc; - gdouble w, cosine, sine, coeff, q0, q1, q2; - guint i; - - g_assert (threadpool); - g_assert (sample_count >= period); - g_assert (period >= 2); - - hc = &threadpool->heuristic_hill_climbing; - - w = 2.0 * M_PI / period; - cosine = cos (w); - sine = sin (w); - coeff = 2.0 * cosine; - q0 = q1 = q2 = 0; - - for (i = 0; i < sample_count; ++i) { - q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure]; - q2 = q1; - q1 = q0; - } - - return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count)); -} - -static gint16 -hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) -{ - ThreadPoolHillClimbing *hc; - ThreadPoolHeuristicStateTransition transition; - gdouble throughput; - gdouble throughput_error_estimate; - gdouble confidence; - gdouble move; - gdouble gain; - gint sample_index; - gint sample_count; - gint new_thread_wave_magnitude; - gint new_thread_count; - double_complex thread_wave_component; - double_complex throughput_wave_component; - double_complex ratio; - - g_assert (threadpool); - g_assert (adjustment_interval); - - hc = &threadpool->heuristic_hill_climbing; - - /* If someone changed the thread count without telling us, update our records accordingly. */ - if (current_thread_count != hc->last_thread_count) - hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING); - - /* Update the cumulative stats for this thread count */ - hc->elapsed_since_last_change += sample_duration; - hc->completions_since_last_change += completions; - - /* Add in any data we've already collected about this sample */ - sample_duration += hc->accumulated_sample_duration; - completions += hc->accumulated_completion_count; - - /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end - * of each work item, we are goinng to be missing some data about what really happened during the - * sample interval. The count produced by each thread includes an initial work item that may have - * started well before the start of the interval, and each thread may have been running some new - * work item for some time before the end of the interval, which did not yet get counted. So - * our count is going to be off by +/- threadCount workitems. - * - * The exception is that the thread that reported to us last time definitely wasn't running any work - * at that time, and the thread that's reporting now definitely isn't running a work item now. So - * we really only need to consider threadCount-1 threads. - * - * Thus the percent error in our count is +/- (threadCount-1)/numCompletions. - * - * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because - * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, - * then the next one likely will be too. The one after that will include the sum of the completions - * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have - * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency - * range we're targeting, which will not be filtered by the frequency-domain translation. */ - if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { - /* Not accurate enough yet. Let's accumulate the data so - * far, and tell the ThreadPool to collect a little more. */ - hc->accumulated_sample_duration = sample_duration; - hc->accumulated_completion_count = completions; - *adjustment_interval = 10; - return current_thread_count; - } - - /* We've got enouugh data for our sample; reset our accumulators for next time. */ - hc->accumulated_sample_duration = 0; - hc->accumulated_completion_count = 0; - - /* Add the current thread count and throughput sample to our history. */ - throughput = ((gdouble) completions) / sample_duration; - - sample_index = hc->total_samples % hc->samples_to_measure; - hc->samples [sample_index] = throughput; - hc->thread_counts [sample_index] = current_thread_count; - hc->total_samples ++; - - /* Set up defaults for our metrics. */ - thread_wave_component = mono_double_complex_make(0, 0); - throughput_wave_component = mono_double_complex_make(0, 0); - throughput_error_estimate = 0; - ratio = mono_double_complex_make(0, 0); - confidence = 0; - - transition = TRANSITION_WARMUP; - - /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also - * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between - * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */ - sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period; - - if (sample_count > hc->wave_period) { - guint i; - gdouble average_throughput; - gdouble average_thread_count; - gdouble sample_sum = 0; - gdouble thread_sum = 0; - - /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */ - for (i = 0; i < sample_count; ++i) { - guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure; - sample_sum += hc->samples [j]; - thread_sum += hc->thread_counts [j]; - } - - average_throughput = sample_sum / sample_count; - average_thread_count = thread_sum / sample_count; - - if (average_throughput > 0 && average_thread_count > 0) { - gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2; - - /* Calculate the periods of the adjacent frequency bands we'll be using to - * measure noise levels. We want the two adjacent Fourier frequency bands. */ - adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1); - adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1); - - /* Get the the three different frequency components of the throughput (scaled by average - * throughput). Our "error" estimate (the amount of noise that might be present in the - * frequency band we're really interested in) is the average of the adjacent bands. */ - throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput); - throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput)); - - if (adjacent_period_2 <= sample_count) { - throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component ( - hc->samples, sample_count, adjacent_period_2), average_throughput))); - } - - /* Do the same for the thread counts, so we have something to compare to. We don't - * measure thread count noise, because there is none; these are exact measurements. */ - thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count); - - /* Update our moving average of the throughput noise. We'll use this - * later as feedback to determine the new size of the thread wave. */ - if (hc->average_throughput_noise == 0) { - hc->average_throughput_noise = throughput_error_estimate; - } else { - hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate) - + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise); - } - - if (cabs (thread_wave_component) > 0) { - /* Adjust the throughput wave so it's centered around the target wave, - * and then calculate the adjusted throughput/thread ratio. */ - ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component); - transition = TRANSITION_CLIMBING_MOVE; - } else { - ratio = mono_double_complex_make (0, 0); - transition = TRANSITION_STABILIZING; - } - - noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate); - if (noise_for_confidence > 0) { - confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio; - } else { - /* there is no noise! */ - confidence = 1.0; - } - } - } - - /* We use just the real part of the complex ratio we just calculated. If the throughput signal - * is exactly in phase with the thread signal, this will be the same as taking the magnitude of - * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move - * backward (because this indicates that our changes are having the opposite of the intended effect). - * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're - * having a negative or positive effect on throughput. */ - move = creal (ratio); - move = CLAMP (move, -1.0, 1.0); - - /* Apply our confidence multiplier. */ - move *= CLAMP (confidence, -1.0, 1.0); - - /* Now apply non-linear gain, such that values around zero are attenuated, while higher values - * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly - * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */ - gain = hc->max_change_per_second * sample_duration; - move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain; - move = MIN (move, hc->max_change_per_sample); - - /* If the result was positive, and CPU is > 95%, refuse the move. */ - if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH) - move = 0.0; - - /* Apply the move to our control setting. */ - hc->current_control_setting += move; - - /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the - * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */ - new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise - * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); - new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); - - /* Make sure our control setting is within the ThreadPool's limits. */ - hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude); - - /* Calculate the new thread count (control setting + square wave). */ - new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); - - /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */ - new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max); - - if (new_thread_count != current_thread_count) - hill_climbing_change_thread_count (new_thread_count, transition); - - if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min) - *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0))); - else - *adjustment_interval = hc->current_sample_interval; - - return new_thread_count; -} - -static void -heuristic_notify_work_completed (void) -{ - g_assert (threadpool); - - InterlockedIncrement (&threadpool->heuristic_completions); - threadpool->heuristic_last_dequeue = mono_msec_ticks (); -} - -static gboolean -heuristic_should_adjust (void) -{ - g_assert (threadpool); - - if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) { - ThreadPoolCounter counter; - counter.as_gint64 = COUNTER_READ(); - if (counter._.working <= counter._.max_working) - return TRUE; - } - - return FALSE; -} - -static void -heuristic_adjust (void) -{ - g_assert (threadpool); - - if (mono_coop_mutex_trylock (&threadpool->heuristic_lock) == 0) { - gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0); - gint64 sample_end = mono_msec_ticks (); - gint64 sample_duration = sample_end - threadpool->heuristic_sample_start; - - if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) { - ThreadPoolCounter counter; - gint16 new_thread_count; - - counter.as_gint64 = COUNTER_READ (); - new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval); - - COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; }); - - if (new_thread_count > counter._.max_working) - worker_request (mono_domain_get ()); - - threadpool->heuristic_sample_start = sample_end; - threadpool->heuristic_last_adjustment = mono_msec_ticks (); - } - - mono_coop_mutex_unlock (&threadpool->heuristic_lock); - } -} - -void -mono_threadpool_ms_cleanup (void) -{ -#ifndef DISABLE_SOCKETS - mono_threadpool_ms_io_cleanup (); -#endif - mono_lazy_cleanup (&status, cleanup); -} - -MonoAsyncResult * -mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error) -{ - static MonoClass *async_call_klass = NULL; - MonoMethodMessage *message; - MonoAsyncResult *async_result; - MonoAsyncCall *async_call; - MonoDelegate *async_callback = NULL; - MonoObject *state = NULL; - - if (!async_call_klass) - async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); - - mono_lazy_initialize (&status, initialize); - - mono_error_init (error); - - message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error); - return_val_if_nok (error, NULL); - - async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error); - return_val_if_nok (error, NULL); - - MONO_OBJECT_SETREF (async_call, msg, message); - MONO_OBJECT_SETREF (async_call, state, state); - - if (async_callback) { - MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass)); - MONO_OBJECT_SETREF (async_call, cb_target, async_callback); - } - - async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error); - return_val_if_nok (error, NULL); - MONO_OBJECT_SETREF (async_result, async_delegate, target); - - mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result, error); - return_val_if_nok (error, NULL); - - return async_result; -} - -MonoObject * -mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error) -{ - MonoAsyncCall *ac; - - mono_error_init (error); - g_assert (exc); - g_assert (out_args); - - *exc = NULL; - *out_args = NULL; - - /* check if already finished */ - mono_monitor_enter ((MonoObject*) ares); - - if (ares->endinvoke_called) { - mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once"); - mono_monitor_exit ((MonoObject*) ares); - return NULL; - } - - ares->endinvoke_called = 1; - - /* wait until we are really finished */ - if (ares->completed) { - mono_monitor_exit ((MonoObject *) ares); - } else { - gpointer wait_event; - if (ares->handle) { - wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle); - } else { - wait_event = mono_w32event_create (TRUE, FALSE); - g_assert(wait_event); - MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error); - if (!is_ok (error)) { - CloseHandle (wait_event); - return NULL; - } - MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle); - } - mono_monitor_exit ((MonoObject*) ares); - MONO_ENTER_GC_SAFE; -#ifdef HOST_WIN32 - WaitForSingleObjectEx (wait_event, INFINITE, TRUE); -#else - mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE); -#endif - MONO_EXIT_GC_SAFE; - } - - ac = (MonoAsyncCall*) ares->object_data; - g_assert (ac); - - *exc = ac->msg->exc; /* FIXME: GC add write barrier */ - *out_args = ac->out_args; - return ac->res; -} - -gboolean -mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout) -{ - gint64 end; - ThreadPoolDomain *tpdomain; - gboolean ret; - - g_assert (domain); - g_assert (timeout >= -1); - - g_assert (mono_domain_is_unloading (domain)); - - if (timeout != -1) - end = mono_msec_ticks () + timeout; - -#ifndef DISABLE_SOCKETS - mono_threadpool_ms_io_remove_domain_jobs (domain); - if (timeout != -1) { - if (mono_msec_ticks () > end) - return FALSE; - } -#endif - - /* - * Wait for all threads which execute jobs in the domain to exit. - * The is_unloading () check in worker_request () ensures that - * no new jobs are added after we enter the lock below. - */ - mono_lazy_initialize (&status, initialize); - domains_lock (); - - tpdomain = tpdomain_get (domain, FALSE); - if (!tpdomain) { - domains_unlock (); - return TRUE; - } - - ret = TRUE; - - while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) { - if (timeout == -1) { - mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock); - } else { - gint64 now; - gint res; - - now = mono_msec_ticks(); - if (now > end) { - ret = FALSE; - break; - } - - res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now); - if (res != 0) { - ret = FALSE; - break; - } - } - } - - /* Remove from the list the worker threads look at */ - tpdomain_remove (tpdomain); - - domains_unlock (); - - mono_coop_cond_destroy (&tpdomain->cleanup_cond); - tpdomain_free (tpdomain); - - return ret; -} - -void -mono_threadpool_ms_suspend (void) -{ - if (threadpool) - threadpool->suspended = TRUE; -} - -void -mono_threadpool_ms_resume (void) -{ - if (threadpool) - threadpool->suspended = FALSE; -} - -void -ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - ThreadPoolCounter counter; - - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - counter.as_gint64 = COUNTER_READ (); - - *worker_threads = MAX (0, threadpool->limit_worker_max - counter._.active); - *completion_port_threads = threadpool->limit_io_max; -} - -void -ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - *worker_threads = threadpool->limit_worker_min; - *completion_port_threads = threadpool->limit_io_min; -} - -void -ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) -{ - if (!worker_threads || !completion_port_threads) - return; - - mono_lazy_initialize (&status, initialize); - - *worker_threads = threadpool->limit_worker_max; - *completion_port_threads = threadpool->limit_io_max; -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads) -{ - mono_lazy_initialize (&status, initialize); - - if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max) - return FALSE; - if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max) - return FALSE; - - threadpool->limit_worker_min = worker_threads; - threadpool->limit_io_min = completion_port_threads; - - return TRUE; -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads) -{ - gint cpu_count = mono_cpu_count (); - - mono_lazy_initialize (&status, initialize); - - if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count) - return FALSE; - if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count) - return FALSE; - - threadpool->limit_worker_max = worker_threads; - threadpool->limit_io_max = completion_port_threads; - - return TRUE; -} - -void -ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking) -{ - if (enable_worker_tracking) { - // TODO implement some kind of switch to have the possibily to use it - *enable_worker_tracking = FALSE; - } - - mono_lazy_initialize (&status, initialize); -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) -{ - ThreadPoolCounter counter; - - if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ()) - return FALSE; - - heuristic_notify_work_completed (); - - if (heuristic_should_adjust ()) - heuristic_adjust (); - - counter.as_gint64 = COUNTER_READ (); - return counter._.working <= counter._.max_working; -} - -void -ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) -{ - heuristic_notify_work_completed (); - - if (heuristic_should_adjust ()) - heuristic_adjust (); -} - -void -ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working) -{ - // TODO - MonoError error; - mono_error_set_not_implemented (&error, ""); - mono_error_set_pending_exception (&error); -} - -MonoBoolean -ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) -{ - return worker_request (mono_domain_get ()); -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped) -{ - /* This copy the behavior of the current Mono implementation */ - MonoError error; - mono_error_set_not_implemented (&error, ""); - mono_error_set_pending_exception (&error); - return FALSE; -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle) -{ - /* This copy the behavior of the current Mono implementation */ - return TRUE; -} - -MonoBoolean G_GNUC_UNUSED -ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void) -{ - return FALSE; -} diff --git a/mono/metadata/threadpool-ms.h b/mono/metadata/threadpool-ms.h deleted file mode 100644 index 1603e3a15d9..00000000000 --- a/mono/metadata/threadpool-ms.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef _MONO_THREADPOOL_MICROSOFT_H_ -#define _MONO_THREADPOOL_MICROSOFT_H_ - -#include -#include - -#include -#include - -#define SMALL_STACK (sizeof (gpointer) * 32 * 1024) - -typedef struct _MonoNativeOverlapped MonoNativeOverlapped; - -void -mono_threadpool_ms_cleanup (void); - -MonoAsyncResult * -mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error); -MonoObject * -mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error); - -gboolean -mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout); - -void -mono_threadpool_ms_suspend (void); -void -mono_threadpool_ms_resume (void); - -void -ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); -void -ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); -void -ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads); -MonoBoolean -ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads); -void -ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking); -MonoBoolean -ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void); -void -ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void); -void -ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working); -MonoBoolean -ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void); - -MonoBoolean -ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped); - -MonoBoolean -ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle); - -MonoBoolean -ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void); - -/* Internals */ - -gboolean -mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error); - -#endif // _MONO_THREADPOOL_MICROSOFT_H_ diff --git a/mono/metadata/threadpool-worker-default.c b/mono/metadata/threadpool-worker-default.c new file mode 100644 index 00000000000..3e39c57c756 --- /dev/null +++ b/mono/metadata/threadpool-worker-default.c @@ -0,0 +1,1267 @@ +/* + * threadpool-worker.c: native threadpool worker + * + * Author: + * Ludovic Henry (ludovic.henry@xamarin.com) + * + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +#include +#define _USE_MATH_DEFINES // needed by MSVC to define math constants +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CPU_USAGE_LOW 80 +#define CPU_USAGE_HIGH 95 + +#define MONITOR_INTERVAL 500 // ms +#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms + +#define WORKER_CREATION_MAX_PER_SEC 10 + +/* The exponent to apply to the gain. 1.0 means to use linear gain, + * higher values will enhance large moves and damp small ones. + * default: 2.0 */ +#define HILL_CLIMBING_GAIN_EXPONENT 2.0 + +/* The 'cost' of a thread. 0 means drive for increased throughput regardless + * of thread count, higher values bias more against higher thread counts. + * default: 0.15 */ +#define HILL_CLIMBING_BIAS 0.15 + +#define HILL_CLIMBING_WAVE_PERIOD 4 +#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20 +#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0 +#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8 +#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0 +#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4 +#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20 +#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10 +#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200 +#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01 +#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15 + +typedef enum { + TRANSITION_WARMUP, + TRANSITION_INITIALIZING, + TRANSITION_RANDOM_MOVE, + TRANSITION_CLIMBING_MOVE, + TRANSITION_CHANGE_POINT, + TRANSITION_STABILIZING, + TRANSITION_STARVATION, + TRANSITION_THREAD_TIMED_OUT, + TRANSITION_UNDEFINED, +} ThreadPoolHeuristicStateTransition; + +typedef struct { + gint32 wave_period; + gint32 samples_to_measure; + gdouble target_throughput_ratio; + gdouble target_signal_to_noise_ratio; + gdouble max_change_per_second; + gdouble max_change_per_sample; + gint32 max_thread_wave_magnitude; + gint32 sample_interval_low; + gdouble thread_magnitude_multiplier; + gint32 sample_interval_high; + gdouble throughput_error_smoothing_factor; + gdouble gain_exponent; + gdouble max_sample_error; + + gdouble current_control_setting; + gint64 total_samples; + gint16 last_thread_count; + gdouble elapsed_since_last_change; + gdouble completions_since_last_change; + + gdouble average_throughput_noise; + + gdouble *samples; + gdouble *thread_counts; + + guint32 current_sample_interval; + gpointer random_interval_generator; + + gint32 accumulated_completion_count; + gdouble accumulated_sample_duration; +} ThreadPoolHillClimbing; + +typedef struct { + MonoThreadPoolWorkerCallback callback; + gpointer data; +} ThreadPoolWorkItem; + +typedef union { + struct { + gint16 max_working; /* determined by heuristic */ + gint16 starting; /* starting, but not yet in worker_thread */ + gint16 working; /* executing worker_thread */ + gint16 parked; /* parked */ + } _; + gint64 as_gint64; +} ThreadPoolWorkerCounter; + +typedef MonoInternalThread ThreadPoolWorkerThread; + +struct MonoThreadPoolWorker { + MonoRefCount ref; + + ThreadPoolWorkerCounter counters; + + GPtrArray *threads; // ThreadPoolWorkerThread* [] + MonoCoopMutex threads_lock; /* protect access to working_threads and parked_threads */ + gint32 parked_threads_count; + MonoCoopCond parked_threads_cond; + MonoCoopCond threads_exit_cond; + + ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem [] + gint32 work_items_count; + gint32 work_items_size; + MonoCoopMutex work_items_lock; + + guint32 worker_creation_current_second; + guint32 worker_creation_current_count; + MonoCoopMutex worker_creation_lock; + + gint32 heuristic_completions; + gint64 heuristic_sample_start; + gint64 heuristic_last_dequeue; // ms + gint64 heuristic_last_adjustment; // ms + gint64 heuristic_adjustment_interval; // ms + ThreadPoolHillClimbing heuristic_hill_climbing; + MonoCoopMutex heuristic_lock; + + gint32 limit_worker_min; + gint32 limit_worker_max; + + MonoCpuUsageState *cpu_usage_state; + gint32 cpu_usage; + + /* suspended by the debugger */ + gboolean suspended; + + gint32 monitor_status; +}; + +enum { + MONITOR_STATUS_REQUESTED, + MONITOR_STATUS_WAITING_FOR_REQUEST, + MONITOR_STATUS_NOT_RUNNING, +}; + +#define COUNTER_CHECK(counter) \ + do { \ + g_assert (counter._.max_working > 0); \ + g_assert (counter._.starting >= 0); \ + g_assert (counter._.working >= 0); \ + } while (0) + +#define COUNTER_ATOMIC(worker,var,block) \ + do { \ + ThreadPoolWorkerCounter __old; \ + do { \ + g_assert (worker); \ + __old = COUNTER_READ (worker); \ + (var) = __old; \ + { block; } \ + COUNTER_CHECK (var); \ + } while (InterlockedCompareExchange64 (&worker->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \ + } while (0) + +static inline ThreadPoolWorkerCounter +COUNTER_READ (MonoThreadPoolWorker *worker) +{ + ThreadPoolWorkerCounter counter; + counter.as_gint64 = InterlockedRead64 (&worker->counters.as_gint64); + return counter; +} + +static gpointer +rand_create (void) +{ + mono_rand_open (); + return mono_rand_init (NULL, 0); +} + +static guint32 +rand_next (gpointer *handle, guint32 min, guint32 max) +{ + MonoError error; + guint32 val; + mono_rand_try_get_uint32 (handle, &val, min, max, &error); + // FIXME handle error + mono_error_assert_ok (&error); + return val; +} + +static void +destroy (gpointer data) +{ + MonoThreadPoolWorker *worker; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + // FIXME destroy everything + + g_free (worker); +} + +void +mono_threadpool_worker_init (MonoThreadPoolWorker **worker) +{ + MonoThreadPoolWorker *wk; + ThreadPoolHillClimbing *hc; + const char *threads_per_cpu_env; + gint threads_per_cpu; + gint threads_count; + + g_assert (worker); + + wk = *worker = g_new0 (MonoThreadPoolWorker, 1); + + mono_refcount_init (wk, destroy); + + wk->threads = g_ptr_array_new (); + mono_coop_mutex_init (&wk->threads_lock); + wk->parked_threads_count = 0; + mono_coop_cond_init (&wk->parked_threads_cond); + mono_coop_cond_init (&wk->threads_exit_cond); + + /* wk->work_items_size is inited to 0 */ + mono_coop_mutex_init (&wk->work_items_lock); + + wk->worker_creation_current_second = -1; + mono_coop_mutex_init (&wk->worker_creation_lock); + + wk->heuristic_adjustment_interval = 10; + mono_coop_mutex_init (&wk->heuristic_lock); + + mono_rand_open (); + + hc = &wk->heuristic_hill_climbing; + + hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; + hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; + hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER; + hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE; + hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS; + hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO; + hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND; + hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE; + hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW; + hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH; + hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR; + hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT; + hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT; + hc->current_control_setting = 0; + hc->total_samples = 0; + hc->last_thread_count = 0; + hc->average_throughput_noise = 0; + hc->elapsed_since_last_change = 0; + hc->accumulated_completion_count = 0; + hc->accumulated_sample_duration = 0; + hc->samples = g_new0 (gdouble, hc->samples_to_measure); + hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure); + hc->random_interval_generator = rand_create (); + hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); + + if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU"))) + threads_per_cpu = 1; + else + threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50); + + threads_count = mono_cpu_count () * threads_per_cpu; + + wk->limit_worker_min = threads_count; + +#if defined (PLATFORM_ANDROID) || defined (HOST_IOS) + wk->limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); +#else + wk->limit_worker_max = threads_count * 100; +#endif + + wk->counters._.max_working = wk->limit_worker_min; + + wk->cpu_usage_state = g_new0 (MonoCpuUsageState, 1); + + wk->suspended = FALSE; + + wk->monitor_status = MONITOR_STATUS_NOT_RUNNING; +} + +void +mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker) +{ + MonoInternalThread *current; + + /* we make the assumption along the code that we are + * cleaning up only if the runtime is shutting down */ + g_assert (mono_runtime_is_shutting_down ()); + + current = mono_thread_internal_current (); + + while (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING) + mono_thread_info_sleep (1, NULL); + + mono_coop_mutex_lock (&worker->threads_lock); + + /* unpark all worker->parked_threads */ + mono_coop_cond_broadcast (&worker->parked_threads_cond); + + for (;;) { + ThreadPoolWorkerCounter counter; + + counter = COUNTER_READ (worker); + if (counter._.starting + counter._.working + counter._.parked == 0) + break; + + if (counter._.starting + counter._.working + counter._.parked == 1) { + if (worker->threads->len == 1 && g_ptr_array_index (worker->threads, 0) == current) { + /* We are waiting on ourselves */ + break; + } + } + + mono_coop_cond_wait (&worker->threads_exit_cond, &worker->threads_lock); + } + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_refcount_dec (worker); +} + +static void +work_item_lock (MonoThreadPoolWorker *worker) +{ + mono_coop_mutex_lock (&worker->work_items_lock); +} + +static void +work_item_unlock (MonoThreadPoolWorker *worker) +{ + mono_coop_mutex_unlock (&worker->work_items_lock); +} + +static void +work_item_push (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +{ + ThreadPoolWorkItem work_item; + + g_assert (worker); + g_assert (callback); + + work_item.callback = callback; + work_item.data = data; + + work_item_lock (worker); + + g_assert (worker->work_items_count <= worker->work_items_size); + + if (G_UNLIKELY (worker->work_items_count == worker->work_items_size)) { + worker->work_items_size += 64; + worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + } + + g_assert (worker->work_items); + + worker->work_items [worker->work_items_count ++] = work_item; + + // printf ("[push] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", + // worker->work_items, worker->work_items_count, worker->work_items_size); + + work_item_unlock (worker); +} + +static gboolean +work_item_try_pop (MonoThreadPoolWorker *worker, ThreadPoolWorkItem *work_item) +{ + g_assert (worker); + g_assert (work_item); + + work_item_lock (worker); + + // printf ("[pop] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n", + // worker->work_items, worker->work_items_count, worker->work_items_size); + + if (worker->work_items_count == 0) { + work_item_unlock (worker); + return FALSE; + } + + *work_item = worker->work_items [-- worker->work_items_count]; + + if (G_UNLIKELY (worker->work_items_count >= 64 * 3 && worker->work_items_count < worker->work_items_size / 2)) { + worker->work_items_size -= 64; + worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size); + } + + work_item_unlock (worker); + + return TRUE; +} + +static gint32 +work_item_count (MonoThreadPoolWorker *worker) +{ + gint32 count; + + work_item_lock (worker); + count = worker->work_items_count; + work_item_unlock (worker); + + return count; +} + +static void worker_request (MonoThreadPoolWorker *worker); + +void +mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data) +{ + work_item_push (worker, callback, data); + + worker_request (worker); +} + +static void +worker_wait_interrupt (gpointer data) +{ + MonoThreadPoolWorker *worker; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + mono_coop_mutex_lock (&worker->threads_lock); + mono_coop_cond_signal (&worker->parked_threads_cond); + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_refcount_dec (worker); +} + +/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */ +static gboolean +worker_park (MonoThreadPoolWorker *worker) +{ + gboolean timeout = FALSE; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ()); + + mono_coop_mutex_lock (&worker->threads_lock); + + if (!mono_runtime_is_shutting_down ()) { + static gpointer rand_handle = NULL; + MonoInternalThread *thread; + gboolean interrupted = FALSE; + ThreadPoolWorkerCounter counter; + + if (!rand_handle) + rand_handle = rand_create (); + g_assert (rand_handle); + + thread = mono_thread_internal_current (); + g_assert (thread); + + COUNTER_ATOMIC (worker, counter, { + counter._.working --; + counter._.parked ++; + }); + + worker->parked_threads_count += 1; + + mono_thread_info_install_interrupt (worker_wait_interrupt, mono_refcount_inc (worker), &interrupted); + if (interrupted) { + mono_refcount_dec (worker); + goto done; + } + + if (mono_coop_cond_timedwait (&worker->parked_threads_cond, &worker->threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0) + timeout = TRUE; + + mono_thread_info_uninstall_interrupt (&interrupted); + if (!interrupted) + mono_refcount_dec (worker); + +done: + worker->parked_threads_count -= 1; + + COUNTER_ATOMIC (worker, counter, { + counter._.working ++; + counter._.parked --; + }); + } + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no"); + + return timeout; +} + +static gboolean +worker_try_unpark (MonoThreadPoolWorker *worker) +{ + gboolean res = FALSE; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); + + mono_coop_mutex_lock (&worker->threads_lock); + if (worker->parked_threads_count > 0) { + mono_coop_cond_signal (&worker->parked_threads_cond); + res = TRUE; + } + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); + + return res; +} + +static void +worker_thread (gpointer data) +{ + MonoThreadPoolWorker *worker; + MonoError error; + MonoInternalThread *thread; + ThreadPoolWorkerCounter counter; + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + COUNTER_ATOMIC (worker, counter, { + counter._.starting --; + counter._.working ++; + }); + + thread = mono_thread_internal_current (); + g_assert (thread); + + mono_coop_mutex_lock (&worker->threads_lock); + g_ptr_array_add (worker->threads, thread); + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error); + mono_error_assert_ok (&error); + + while (!mono_runtime_is_shutting_down ()) { + ThreadPoolWorkItem work_item; + + if (mono_thread_interruption_checkpoint ()) + continue; + + if (!work_item_try_pop (worker, &work_item)) { + gboolean timeout; + + timeout = worker_park (worker); + if (timeout) + break; + + continue; + } + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing %p (%p)", + mono_native_thread_id_get (), work_item.callback, work_item.data); + + work_item.callback (work_item.data); + } + + mono_coop_mutex_lock (&worker->threads_lock); + + COUNTER_ATOMIC (worker, counter, { + counter._.working --; + }); + + g_ptr_array_remove (worker->threads, thread); + + mono_coop_cond_signal (&worker->threads_exit_cond); + + mono_coop_mutex_unlock (&worker->threads_lock); + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); + + mono_refcount_dec (worker); +} + +static gboolean +worker_try_create (MonoThreadPoolWorker *worker) +{ + MonoError error; + MonoInternalThread *thread; + gint64 current_ticks; + gint32 now; + ThreadPoolWorkerCounter counter; + + if (mono_runtime_is_shutting_down ()) + return FALSE; + + mono_coop_mutex_lock (&worker->worker_creation_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); + + current_ticks = mono_100ns_ticks (); + if (0 == current_ticks) { + g_warning ("failed to get 100ns ticks"); + } else { + now = current_ticks / (10 * 1000 * 1000); + if (worker->worker_creation_current_second != now) { + worker->worker_creation_current_second = now; + worker->worker_creation_current_count = 0; + } else { + g_assert (worker->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); + if (worker->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", + mono_native_thread_id_get (), worker->worker_creation_current_count); + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return FALSE; + } + } + } + + COUNTER_ATOMIC (worker, counter, { + if (counter._.working >= counter._.max_working) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", + mono_native_thread_id_get ()); + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return FALSE; + } + counter._.starting ++; + }); + + thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, mono_refcount_inc (worker), TRUE, 0, &error); + if (!thread) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); + mono_error_cleanup (&error); + + COUNTER_ATOMIC (worker, counter, { + counter._.starting --; + }); + + mono_coop_mutex_unlock (&worker->worker_creation_lock); + + mono_refcount_dec (worker); + + return FALSE; + } + + worker->worker_creation_current_count += 1; + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", + mono_native_thread_id_get (), (gpointer) thread->tid, now, worker->worker_creation_current_count); + + mono_coop_mutex_unlock (&worker->worker_creation_lock); + return TRUE; +} + +static void monitor_ensure_running (MonoThreadPoolWorker *worker); + +static void +worker_request (MonoThreadPoolWorker *worker) +{ + g_assert (worker); + + if (worker->suspended) + return; + + monitor_ensure_running (worker); + + if (worker_try_unpark (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); + return; + } + + if (worker_try_create (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); + return; + } + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); +} + +static gboolean +monitor_should_keep_running (MonoThreadPoolWorker *worker) +{ + static gint64 last_should_keep_running = -1; + + g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + + if (InterlockedExchange (&worker->monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) { + gboolean should_keep_running = TRUE, force_should_keep_running = FALSE; + + if (mono_runtime_is_shutting_down ()) { + should_keep_running = FALSE; + } else { + if (work_item_count (worker) == 0) + should_keep_running = FALSE; + + if (!should_keep_running) { + if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) { + should_keep_running = force_should_keep_running = TRUE; + } + } + } + + if (should_keep_running) { + if (last_should_keep_running == -1 || !force_should_keep_running) + last_should_keep_running = mono_100ns_ticks (); + } else { + last_should_keep_running = -1; + if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) + return FALSE; + } + } + + g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED); + + return TRUE; +} + +static gboolean +monitor_sufficient_delay_since_last_dequeue (MonoThreadPoolWorker *worker) +{ + gint64 threshold; + + g_assert (worker); + + if (worker->cpu_usage < CPU_USAGE_LOW) { + threshold = MONITOR_INTERVAL; + } else { + ThreadPoolWorkerCounter counter; + counter = COUNTER_READ (worker); + threshold = counter._.max_working * MONITOR_INTERVAL * 2; + } + + return mono_msec_ticks () >= worker->heuristic_last_dequeue + threshold; +} + +static void hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); + +static void +monitor_thread (gpointer data) +{ + MonoThreadPoolWorker *worker; + MonoInternalThread *internal; + guint i; + + worker = (MonoThreadPoolWorker*) data; + g_assert (worker); + + internal = mono_thread_internal_current (); + g_assert (internal); + + mono_cpu_usage (worker->cpu_usage_state); + + // printf ("monitor_thread: start\n"); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ()); + + do { + ThreadPoolWorkerCounter counter; + gboolean limit_worker_max_reached; + gint32 interval_left = MONITOR_INTERVAL; + gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */ + + g_assert (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING); + + // counter = COUNTER_READ (worker); + // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n", + // counter._.starting, counter._.working, counter._.parked, counter._.max_working); + + do { + gint64 ts; + gboolean alerted = FALSE; + + if (mono_runtime_is_shutting_down ()) + break; + + ts = mono_msec_ticks (); + if (mono_thread_info_sleep (interval_left, &alerted) == 0) + break; + interval_left -= mono_msec_ticks () - ts; + + g_assert (!(internal->state & ThreadState_StopRequested)); + mono_thread_interruption_checkpoint (); + } while (interval_left > 0 && ++awake < 10); + + if (mono_runtime_is_shutting_down ()) + continue; + + if (worker->suspended) + continue; + + if (work_item_count (worker) == 0) + continue; + + worker->cpu_usage = mono_cpu_usage (worker->cpu_usage_state); + + if (!monitor_sufficient_delay_since_last_dequeue (worker)) + continue; + + limit_worker_max_reached = FALSE; + + COUNTER_ATOMIC (worker, counter, { + if (counter._.max_working >= worker->limit_worker_max) { + limit_worker_max_reached = TRUE; + break; + } + counter._.max_working ++; + }); + + if (limit_worker_max_reached) + continue; + + hill_climbing_force_change (worker, counter._.max_working, TRANSITION_STARVATION); + + for (i = 0; i < 5; ++i) { + if (mono_runtime_is_shutting_down ()) + break; + + if (worker_try_unpark (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); + break; + } + + if (worker_try_create (worker)) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); + break; + } + } + } while (monitor_should_keep_running (worker)); + + // printf ("monitor_thread: stop\n"); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ()); +} + +static void +monitor_ensure_running (MonoThreadPoolWorker *worker) +{ + MonoError error; + for (;;) { + switch (worker->monitor_status) { + case MONITOR_STATUS_REQUESTED: + // printf ("monitor_thread: requested\n"); + return; + case MONITOR_STATUS_WAITING_FOR_REQUEST: + // printf ("monitor_thread: waiting for request\n"); + InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST); + break; + case MONITOR_STATUS_NOT_RUNNING: + // printf ("monitor_thread: not running\n"); + if (mono_runtime_is_shutting_down ()) + return; + if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) { + // printf ("monitor_thread: creating\n"); + if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, worker, TRUE, SMALL_STACK, &error)) { + // printf ("monitor_thread: creating failed\n"); + worker->monitor_status = MONITOR_STATUS_NOT_RUNNING; + mono_error_cleanup (&error); + } + return; + } + break; + default: g_assert_not_reached (); + } + } +} + +static void +hill_climbing_change_thread_count (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +{ + ThreadPoolHillClimbing *hc; + + g_assert (worker); + + hc = &worker->heuristic_hill_climbing; + + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); + + hc->last_thread_count = new_thread_count; + hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); + hc->elapsed_since_last_change = 0; + hc->completions_since_last_change = 0; +} + +static void +hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition) +{ + ThreadPoolHillClimbing *hc; + + g_assert (worker); + + hc = &worker->heuristic_hill_climbing; + + if (new_thread_count != hc->last_thread_count) { + hc->current_control_setting += new_thread_count - hc->last_thread_count; + hill_climbing_change_thread_count (worker, new_thread_count, transition); + } +} + +static double_complex +hill_climbing_get_wave_component (MonoThreadPoolWorker *worker, gdouble *samples, guint sample_count, gdouble period) +{ + ThreadPoolHillClimbing *hc; + gdouble w, cosine, sine, coeff, q0, q1, q2; + guint i; + + g_assert (worker); + g_assert (sample_count >= period); + g_assert (period >= 2); + + hc = &worker->heuristic_hill_climbing; + + w = 2.0 * M_PI / period; + cosine = cos (w); + sine = sin (w); + coeff = 2.0 * cosine; + q0 = q1 = q2 = 0; + + for (i = 0; i < sample_count; ++i) { + q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure]; + q2 = q1; + q1 = q0; + } + + return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count)); +} + +static gint16 +hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval) +{ + ThreadPoolHillClimbing *hc; + ThreadPoolHeuristicStateTransition transition; + gdouble throughput; + gdouble throughput_error_estimate; + gdouble confidence; + gdouble move; + gdouble gain; + gint sample_index; + gint sample_count; + gint new_thread_wave_magnitude; + gint new_thread_count; + double_complex thread_wave_component; + double_complex throughput_wave_component; + double_complex ratio; + + g_assert (worker); + g_assert (adjustment_interval); + + hc = &worker->heuristic_hill_climbing; + + /* If someone changed the thread count without telling us, update our records accordingly. */ + if (current_thread_count != hc->last_thread_count) + hill_climbing_force_change (worker, current_thread_count, TRANSITION_INITIALIZING); + + /* Update the cumulative stats for this thread count */ + hc->elapsed_since_last_change += sample_duration; + hc->completions_since_last_change += completions; + + /* Add in any data we've already collected about this sample */ + sample_duration += hc->accumulated_sample_duration; + completions += hc->accumulated_completion_count; + + /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end + * of each work item, we are goinng to be missing some data about what really happened during the + * sample interval. The count produced by each thread includes an initial work item that may have + * started well before the start of the interval, and each thread may have been running some new + * work item for some time before the end of the interval, which did not yet get counted. So + * our count is going to be off by +/- threadCount workitems. + * + * The exception is that the thread that reported to us last time definitely wasn't running any work + * at that time, and the thread that's reporting now definitely isn't running a work item now. So + * we really only need to consider threadCount-1 threads. + * + * Thus the percent error in our count is +/- (threadCount-1)/numCompletions. + * + * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because + * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, + * then the next one likely will be too. The one after that will include the sum of the completions + * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have + * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency + * range we're targeting, which will not be filtered by the frequency-domain translation. */ + if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { + /* Not accurate enough yet. Let's accumulate the data so + * far, and tell the MonoThreadPoolWorker to collect a little more. */ + hc->accumulated_sample_duration = sample_duration; + hc->accumulated_completion_count = completions; + *adjustment_interval = 10; + return current_thread_count; + } + + /* We've got enouugh data for our sample; reset our accumulators for next time. */ + hc->accumulated_sample_duration = 0; + hc->accumulated_completion_count = 0; + + /* Add the current thread count and throughput sample to our history. */ + throughput = ((gdouble) completions) / sample_duration; + + sample_index = hc->total_samples % hc->samples_to_measure; + hc->samples [sample_index] = throughput; + hc->thread_counts [sample_index] = current_thread_count; + hc->total_samples ++; + + /* Set up defaults for our metrics. */ + thread_wave_component = mono_double_complex_make(0, 0); + throughput_wave_component = mono_double_complex_make(0, 0); + throughput_error_estimate = 0; + ratio = mono_double_complex_make(0, 0); + confidence = 0; + + transition = TRANSITION_WARMUP; + + /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also + * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between + * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */ + sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period; + + if (sample_count > hc->wave_period) { + guint i; + gdouble average_throughput; + gdouble average_thread_count; + gdouble sample_sum = 0; + gdouble thread_sum = 0; + + /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */ + for (i = 0; i < sample_count; ++i) { + guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure; + sample_sum += hc->samples [j]; + thread_sum += hc->thread_counts [j]; + } + + average_throughput = sample_sum / sample_count; + average_thread_count = thread_sum / sample_count; + + if (average_throughput > 0 && average_thread_count > 0) { + gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2; + + /* Calculate the periods of the adjacent frequency bands we'll be using to + * measure noise levels. We want the two adjacent Fourier frequency bands. */ + adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1); + adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1); + + /* Get the the three different frequency components of the throughput (scaled by average + * throughput). Our "error" estimate (the amount of noise that might be present in the + * frequency band we're really interested in) is the average of the adjacent bands. */ + throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, hc->wave_period), average_throughput); + throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, adjacent_period_1), average_throughput)); + + if (adjacent_period_2 <= sample_count) { + throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component ( + worker, hc->samples, sample_count, adjacent_period_2), average_throughput))); + } + + /* Do the same for the thread counts, so we have something to compare to. We don't + * measure thread count noise, because there is none; these are exact measurements. */ + thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->thread_counts, sample_count, hc->wave_period), average_thread_count); + + /* Update our moving average of the throughput noise. We'll use this + * later as feedback to determine the new size of the thread wave. */ + if (hc->average_throughput_noise == 0) { + hc->average_throughput_noise = throughput_error_estimate; + } else { + hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate) + + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise); + } + + if (cabs (thread_wave_component) > 0) { + /* Adjust the throughput wave so it's centered around the target wave, + * and then calculate the adjusted throughput/thread ratio. */ + ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component); + transition = TRANSITION_CLIMBING_MOVE; + } else { + ratio = mono_double_complex_make (0, 0); + transition = TRANSITION_STABILIZING; + } + + noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate); + if (noise_for_confidence > 0) { + confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio; + } else { + /* there is no noise! */ + confidence = 1.0; + } + } + } + + /* We use just the real part of the complex ratio we just calculated. If the throughput signal + * is exactly in phase with the thread signal, this will be the same as taking the magnitude of + * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move + * backward (because this indicates that our changes are having the opposite of the intended effect). + * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're + * having a negative or positive effect on throughput. */ + move = creal (ratio); + move = CLAMP (move, -1.0, 1.0); + + /* Apply our confidence multiplier. */ + move *= CLAMP (confidence, -1.0, 1.0); + + /* Now apply non-linear gain, such that values around zero are attenuated, while higher values + * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly + * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */ + gain = hc->max_change_per_second * sample_duration; + move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain; + move = MIN (move, hc->max_change_per_sample); + + /* If the result was positive, and CPU is > 95%, refuse the move. */ + if (move > 0.0 && worker->cpu_usage > CPU_USAGE_HIGH) + move = 0.0; + + /* Apply the move to our control setting. */ + hc->current_control_setting += move; + + /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the + * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */ + new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise + * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); + new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); + + /* Make sure our control setting is within the MonoThreadPoolWorker's limits. */ + hc->current_control_setting = CLAMP (hc->current_control_setting, worker->limit_worker_min, worker->limit_worker_max - new_thread_wave_magnitude); + + /* Calculate the new thread count (control setting + square wave). */ + new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); + + /* Make sure the new thread count doesn't exceed the MonoThreadPoolWorker's limits. */ + new_thread_count = CLAMP (new_thread_count, worker->limit_worker_min, worker->limit_worker_max); + + if (new_thread_count != current_thread_count) + hill_climbing_change_thread_count (worker, new_thread_count, transition); + + if (creal (ratio) < 0.0 && new_thread_count == worker->limit_worker_min) + *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0))); + else + *adjustment_interval = hc->current_sample_interval; + + return new_thread_count; +} + +static gboolean +heuristic_should_adjust (MonoThreadPoolWorker *worker) +{ + if (worker->heuristic_last_dequeue > worker->heuristic_last_adjustment + worker->heuristic_adjustment_interval) { + ThreadPoolWorkerCounter counter; + counter = COUNTER_READ (worker); + if (counter._.working <= counter._.max_working) + return TRUE; + } + + return FALSE; +} + +static void +heuristic_adjust (MonoThreadPoolWorker *worker) +{ + if (mono_coop_mutex_trylock (&worker->heuristic_lock) == 0) { + gint32 completions = InterlockedExchange (&worker->heuristic_completions, 0); + gint64 sample_end = mono_msec_ticks (); + gint64 sample_duration = sample_end - worker->heuristic_sample_start; + + if (sample_duration >= worker->heuristic_adjustment_interval / 2) { + ThreadPoolWorkerCounter counter; + gint16 new_thread_count; + + counter = COUNTER_READ (worker); + new_thread_count = hill_climbing_update (worker, counter._.max_working, sample_duration, completions, &worker->heuristic_adjustment_interval); + + COUNTER_ATOMIC (worker, counter, { + counter._.max_working = new_thread_count; + }); + + if (new_thread_count > counter._.max_working) + worker_request (worker); + + worker->heuristic_sample_start = sample_end; + worker->heuristic_last_adjustment = mono_msec_ticks (); + } + + mono_coop_mutex_unlock (&worker->heuristic_lock); + } +} + +static void +heuristic_notify_work_completed (MonoThreadPoolWorker *worker) +{ + g_assert (worker); + + InterlockedIncrement (&worker->heuristic_completions); + worker->heuristic_last_dequeue = mono_msec_ticks (); + + if (heuristic_should_adjust (worker)) + heuristic_adjust (worker); +} + +gboolean +mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker) +{ + ThreadPoolWorkerCounter counter; + + heuristic_notify_work_completed (worker); + + counter = COUNTER_READ (worker); + return counter._.working <= counter._.max_working; +} + +gint32 +mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker) +{ + return worker->limit_worker_min; +} + +gboolean +mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value) +{ + if (value <= 0 || value > worker->limit_worker_max) + return FALSE; + + worker->limit_worker_min = value; + return TRUE; +} + +gint32 +mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker) +{ + return worker->limit_worker_max; +} + +gboolean +mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value) +{ + gint32 cpu_count = mono_cpu_count (); + + if (value < worker->limit_worker_min || value < cpu_count) + return FALSE; + + worker->limit_worker_max = value; + return TRUE; +} + +void +mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended) +{ + worker->suspended = suspended; + if (!suspended) + worker_request (worker); +} diff --git a/mono/metadata/threadpool-worker.h b/mono/metadata/threadpool-worker.h new file mode 100644 index 00000000000..b63df60c915 --- /dev/null +++ b/mono/metadata/threadpool-worker.h @@ -0,0 +1,34 @@ + +#ifndef _MONO_METADATA_THREADPOOL_WORKER_H +#define _MONO_METADATA_THREADPOOL_WORKER_H + +typedef struct MonoThreadPoolWorker MonoThreadPoolWorker; + +typedef void (*MonoThreadPoolWorkerCallback)(gpointer); + +void +mono_threadpool_worker_init (MonoThreadPoolWorker **worker); + +void +mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker); + +void +mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data); + +gboolean +mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker); + +gint32 +mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker); +gboolean +mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value); + +gint32 +mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker); +gboolean +mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value); + +void +mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended); + +#endif /* _MONO_METADATA_THREADPOOL_WORKER_H */ diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c new file mode 100644 index 00000000000..ab88e67f4b9 --- /dev/null +++ b/mono/metadata/threadpool.c @@ -0,0 +1,835 @@ +/* + * threadpool.c: Microsoft threadpool runtime support + * + * Author: + * Ludovic Henry (ludovic.henry@xamarin.com) + * + * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +// +// Files: +// - src/vm/comthreadpool.cpp +// - src/vm/win32threadpoolcpp +// - src/vm/threadpoolrequest.cpp +// - src/vm/hillclimbing.cpp +// +// Ported from C++ to C and adjusted to Mono runtime + +#include +#define _USE_MATH_DEFINES // needed by MSVC to define math constants +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct { + MonoDomain *domain; + /* Number of outstanding jobs */ + gint32 outstanding_request; + /* Number of currently executing jobs */ + gint32 threadpool_jobs; + /* Signalled when threadpool_jobs + outstanding_request is 0 */ + /* Protected by threadpool->domains_lock */ + MonoCoopCond cleanup_cond; +} ThreadPoolDomain; + +typedef union { + struct { + gint16 starting; /* starting, but not yet in worker_callback */ + gint16 working; /* executing worker_callback */ + } _; + gint32 as_gint32; +} ThreadPoolCounter; + +typedef struct { + MonoRefCount ref; + + GPtrArray *domains; // ThreadPoolDomain* [] + MonoCoopMutex domains_lock; + + GPtrArray *threads; // MonoInternalThread* [] + MonoCoopMutex threads_lock; + MonoCoopCond threads_exit_cond; + + ThreadPoolCounter counters; + + gint32 limit_io_min; + gint32 limit_io_max; + + MonoThreadPoolWorker *worker; +} ThreadPool; + +static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; + +static ThreadPool* threadpool; + +#define COUNTER_CHECK(counter) \ + do { \ + g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32)); \ + g_assert (counter._.starting >= 0); \ + g_assert (counter._.working >= 0); \ + } while (0) + +#define COUNTER_ATOMIC(threadpool,var,block) \ + do { \ + ThreadPoolCounter __old; \ + do { \ + g_assert (threadpool); \ + __old = COUNTER_READ (threadpool); \ + (var) = __old; \ + { block; } \ + COUNTER_CHECK (var); \ + } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \ + } while (0) + +static inline ThreadPoolCounter +COUNTER_READ (ThreadPool *threadpool) +{ + ThreadPoolCounter counter; + counter.as_gint32 = InterlockedRead (&threadpool->counters.as_gint32); + return counter; +} + +static inline void +domains_lock (void) +{ + mono_coop_mutex_lock (&threadpool->domains_lock); +} + +static inline void +domains_unlock (void) +{ + mono_coop_mutex_unlock (&threadpool->domains_lock); +} + +static void +destroy (gpointer unused) +{ + g_ptr_array_free (threadpool->domains, TRUE); + mono_coop_mutex_destroy (&threadpool->domains_lock); + + g_ptr_array_free (threadpool->threads, TRUE); + mono_coop_mutex_destroy (&threadpool->threads_lock); + mono_coop_cond_destroy (&threadpool->threads_exit_cond); + + g_free (threadpool); +} + +static void +initialize (void) +{ + g_assert (!threadpool); + threadpool = g_new0 (ThreadPool, 1); + g_assert (threadpool); + + mono_refcount_init (threadpool, destroy); + + threadpool->domains = g_ptr_array_new (); + mono_coop_mutex_init (&threadpool->domains_lock); + + threadpool->threads = g_ptr_array_new (); + mono_coop_mutex_init (&threadpool->threads_lock); + mono_coop_cond_init (&threadpool->threads_exit_cond); + + threadpool->limit_io_min = mono_cpu_count (); + threadpool->limit_io_max = CLAMP (threadpool->limit_io_min * 100, MIN (threadpool->limit_io_min, 200), MAX (threadpool->limit_io_min, 200)); + + mono_threadpool_worker_init (&threadpool->worker); +} + +static void +cleanup (void) +{ + guint i; + MonoInternalThread *current; + + /* we make the assumption along the code that we are + * cleaning up only if the runtime is shutting down */ + g_assert (mono_runtime_is_shutting_down ()); + + current = mono_thread_internal_current (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + /* stop all threadpool->threads */ + for (i = 0; i < threadpool->threads->len; ++i) { + MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool->threads, i); + if (thread != current) + mono_thread_internal_abort (thread); + } + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + /* give a chance to the other threads to exit */ + mono_thread_info_yield (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + for (;;) { + ThreadPoolCounter counter; + + counter = COUNTER_READ (threadpool); + if (counter._.working == 0) + break; + + if (counter._.working == 1) { + if (threadpool->threads->len == 1 && g_ptr_array_index (threadpool->threads, 0) == current) { + /* We are waiting on ourselves */ + break; + } + } + + mono_coop_cond_wait (&threadpool->threads_exit_cond, &threadpool->threads_lock); + } + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + mono_threadpool_worker_cleanup (threadpool->worker); + + mono_refcount_dec (threadpool); +} + +gboolean +mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error) +{ + static MonoClass *threadpool_class = NULL; + static MonoMethod *unsafe_queue_custom_work_item_method = NULL; + MonoDomain *current_domain; + MonoBoolean f; + gpointer args [2]; + + mono_error_init (error); + g_assert (work_item); + + if (!threadpool_class) + threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool"); + + if (!unsafe_queue_custom_work_item_method) + unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2); + g_assert (unsafe_queue_custom_work_item_method); + + f = FALSE; + + args [0] = (gpointer) work_item; + args [1] = (gpointer) &f; + + current_domain = mono_domain_get (); + if (current_domain == domain) { + mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); + return_val_if_nok (error, FALSE); + } else { + mono_thread_push_appdomain_ref (domain); + if (mono_domain_set (domain, FALSE)) { + mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error); + if (!is_ok (error)) { + mono_thread_pop_appdomain_ref (); + return FALSE; + } + mono_domain_set (current_domain, TRUE); + } + mono_thread_pop_appdomain_ref (); + } + return TRUE; +} + +/* LOCKING: domains_lock must be held */ +static void +tpdomain_add (ThreadPoolDomain *tpdomain) +{ + guint i, len; + + g_assert (tpdomain); + + len = threadpool->domains->len; + for (i = 0; i < len; ++i) { + if (g_ptr_array_index (threadpool->domains, i) == tpdomain) + break; + } + + if (i == len) + g_ptr_array_add (threadpool->domains, tpdomain); +} + +/* LOCKING: domains_lock must be held. */ +static gboolean +tpdomain_remove (ThreadPoolDomain *tpdomain) +{ + g_assert (tpdomain); + return g_ptr_array_remove (threadpool->domains, tpdomain); +} + +/* LOCKING: domains_lock must be held */ +static ThreadPoolDomain * +tpdomain_get (MonoDomain *domain, gboolean create) +{ + guint i; + ThreadPoolDomain *tpdomain; + + g_assert (domain); + + for (i = 0; i < threadpool->domains->len; ++i) { + ThreadPoolDomain *tpdomain; + + tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i); + if (tpdomain->domain == domain) + return tpdomain; + } + + if (!create) + return NULL; + + tpdomain = g_new0 (ThreadPoolDomain, 1); + tpdomain->domain = domain; + mono_coop_cond_init (&tpdomain->cleanup_cond); + + tpdomain_add (tpdomain); + + return tpdomain; +} + +static void +tpdomain_free (ThreadPoolDomain *tpdomain) +{ + g_free (tpdomain); +} + +/* LOCKING: domains_lock must be held */ +static ThreadPoolDomain * +tpdomain_get_next (ThreadPoolDomain *current) +{ + ThreadPoolDomain *tpdomain = NULL; + guint len; + + len = threadpool->domains->len; + if (len > 0) { + guint i, current_idx = -1; + if (current) { + for (i = 0; i < len; ++i) { + if (current == g_ptr_array_index (threadpool->domains, i)) { + current_idx = i; + break; + } + } + g_assert (current_idx != (guint)-1); + } + for (i = current_idx + 1; i < len + current_idx + 1; ++i) { + ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len); + if (tmp->outstanding_request > 0) { + tpdomain = tmp; + break; + } + } + } + + return tpdomain; +} + +static void +worker_callback (gpointer unused) +{ + MonoError error; + ThreadPoolDomain *tpdomain, *previous_tpdomain; + ThreadPoolCounter counter; + MonoInternalThread *thread; + + thread = mono_thread_internal_current (); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.starting --; + counter._.working ++; + }); + + if (mono_runtime_is_shutting_down ()) { + COUNTER_ATOMIC (threadpool, counter, { + counter._.working --; + }); + + mono_refcount_dec (threadpool); + return; + } + + mono_coop_mutex_lock (&threadpool->threads_lock); + g_ptr_array_add (threadpool->threads, thread); + mono_coop_mutex_unlock (&threadpool->threads_lock); + + /* + * This is needed so there is always an lmf frame in the runtime invoke call below, + * so ThreadAbortExceptions are caught even if the thread is in native code. + */ + mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE; + + domains_lock (); + + previous_tpdomain = NULL; + + while (!mono_runtime_is_shutting_down ()) { + gboolean retire = FALSE; + + if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) { + domains_unlock (); + mono_thread_interruption_checkpoint (); + domains_lock (); + } + + tpdomain = tpdomain_get_next (previous_tpdomain); + if (!tpdomain) + break; + + tpdomain->outstanding_request --; + g_assert (tpdomain->outstanding_request >= 0); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d)", + mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request); + + g_assert (tpdomain->threadpool_jobs >= 0); + tpdomain->threadpool_jobs ++; + + domains_unlock (); + + mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background); + if (!mono_thread_test_state (thread , ThreadState_Background)) + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); + + mono_thread_push_appdomain_ref (tpdomain->domain); + if (mono_domain_set (tpdomain->domain, FALSE)) { + MonoObject *exc = NULL, *res; + + res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error); + if (exc || !mono_error_ok(&error)) { + if (exc == NULL) + exc = (MonoObject *) mono_error_convert_to_exception (&error); + else + mono_error_cleanup (&error); + mono_thread_internal_unhandled_exception (exc); + } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE) { + retire = TRUE; + } + + mono_domain_set (mono_get_root_domain (), TRUE); + } + mono_thread_pop_appdomain_ref (); + + domains_lock (); + + tpdomain->threadpool_jobs --; + g_assert (tpdomain->threadpool_jobs >= 0); + + if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) { + gboolean removed; + + removed = tpdomain_remove (tpdomain); + g_assert (removed); + + mono_coop_cond_signal (&tpdomain->cleanup_cond); + tpdomain = NULL; + } + + if (retire) + break; + + previous_tpdomain = tpdomain; + } + + domains_unlock (); + + mono_coop_mutex_lock (&threadpool->threads_lock); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.working --; + }); + + g_ptr_array_remove_fast (threadpool->threads, thread); + + mono_coop_cond_signal (&threadpool->threads_exit_cond); + + mono_coop_mutex_unlock (&threadpool->threads_lock); + + mono_refcount_dec (threadpool); +} + +void +mono_threadpool_cleanup (void) +{ +#ifndef DISABLE_SOCKETS + mono_threadpool_io_cleanup (); +#endif + mono_lazy_cleanup (&status, cleanup); +} + +MonoAsyncResult * +mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error) +{ + static MonoClass *async_call_klass = NULL; + MonoMethodMessage *message; + MonoAsyncResult *async_result; + MonoAsyncCall *async_call; + MonoDelegate *async_callback = NULL; + MonoObject *state = NULL; + + if (!async_call_klass) + async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); + + mono_lazy_initialize (&status, initialize); + + mono_error_init (error); + + message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error); + return_val_if_nok (error, NULL); + + async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error); + return_val_if_nok (error, NULL); + + MONO_OBJECT_SETREF (async_call, msg, message); + MONO_OBJECT_SETREF (async_call, state, state); + + if (async_callback) { + MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass)); + MONO_OBJECT_SETREF (async_call, cb_target, async_callback); + } + + async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error); + return_val_if_nok (error, NULL); + MONO_OBJECT_SETREF (async_result, async_delegate, target); + + mono_threadpool_enqueue_work_item (domain, (MonoObject*) async_result, error); + return_val_if_nok (error, NULL); + + return async_result; +} + +MonoObject * +mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error) +{ + MonoAsyncCall *ac; + + mono_error_init (error); + g_assert (exc); + g_assert (out_args); + + *exc = NULL; + *out_args = NULL; + + /* check if already finished */ + mono_monitor_enter ((MonoObject*) ares); + + if (ares->endinvoke_called) { + mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once"); + mono_monitor_exit ((MonoObject*) ares); + return NULL; + } + + ares->endinvoke_called = 1; + + /* wait until we are really finished */ + if (ares->completed) { + mono_monitor_exit ((MonoObject *) ares); + } else { + gpointer wait_event; + if (ares->handle) { + wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle); + } else { + wait_event = mono_w32event_create (TRUE, FALSE); + g_assert(wait_event); + MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error); + if (!is_ok (error)) { + CloseHandle (wait_event); + return NULL; + } + MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle); + } + mono_monitor_exit ((MonoObject*) ares); + MONO_ENTER_GC_SAFE; +#ifdef HOST_WIN32 + WaitForSingleObjectEx (wait_event, INFINITE, TRUE); +#else + mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE); +#endif + MONO_EXIT_GC_SAFE; + } + + ac = (MonoAsyncCall*) ares->object_data; + g_assert (ac); + + *exc = ac->msg->exc; /* FIXME: GC add write barrier */ + *out_args = ac->out_args; + return ac->res; +} + +gboolean +mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout) +{ + gint64 end; + ThreadPoolDomain *tpdomain; + gboolean ret; + + g_assert (domain); + g_assert (timeout >= -1); + + g_assert (mono_domain_is_unloading (domain)); + + if (timeout != -1) + end = mono_msec_ticks () + timeout; + +#ifndef DISABLE_SOCKETS + mono_threadpool_io_remove_domain_jobs (domain); + if (timeout != -1) { + if (mono_msec_ticks () > end) + return FALSE; + } +#endif + + /* + * Wait for all threads which execute jobs in the domain to exit. + * The is_unloading () check in worker_request () ensures that + * no new jobs are added after we enter the lock below. + */ + mono_lazy_initialize (&status, initialize); + domains_lock (); + + tpdomain = tpdomain_get (domain, FALSE); + if (!tpdomain) { + domains_unlock (); + return TRUE; + } + + ret = TRUE; + + while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) { + if (timeout == -1) { + mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock); + } else { + gint64 now; + gint res; + + now = mono_msec_ticks(); + if (now > end) { + ret = FALSE; + break; + } + + res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now); + if (res != 0) { + ret = FALSE; + break; + } + } + } + + /* Remove from the list the worker threads look at */ + tpdomain_remove (tpdomain); + + domains_unlock (); + + mono_coop_cond_destroy (&tpdomain->cleanup_cond); + tpdomain_free (tpdomain); + + return ret; +} + +void +mono_threadpool_suspend (void) +{ + if (threadpool) + mono_threadpool_worker_set_suspended (threadpool->worker, TRUE); +} + +void +mono_threadpool_resume (void) +{ + if (threadpool) + mono_threadpool_worker_set_suspended (threadpool->worker, FALSE); +} + +void +ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + ThreadPoolCounter counter; + + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + counter = COUNTER_READ (threadpool); + + *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool->worker) - counter._.working); + *completion_port_threads = threadpool->limit_io_max; +} + +void +ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + *worker_threads = mono_threadpool_worker_get_min (threadpool->worker); + *completion_port_threads = threadpool->limit_io_min; +} + +void +ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads) +{ + if (!worker_threads || !completion_port_threads) + return; + + mono_lazy_initialize (&status, initialize); + + *worker_threads = mono_threadpool_worker_get_max (threadpool->worker); + *completion_port_threads = threadpool->limit_io_max; +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads) +{ + mono_lazy_initialize (&status, initialize); + + if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max) + return FALSE; + + if (!mono_threadpool_worker_set_min (threadpool->worker, worker_threads)) + return FALSE; + + threadpool->limit_io_min = completion_port_threads; + + return TRUE; +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads) +{ + gint cpu_count = mono_cpu_count (); + + mono_lazy_initialize (&status, initialize); + + if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count) + return FALSE; + + if (!mono_threadpool_worker_set_max (threadpool->worker, worker_threads)) + return FALSE; + + threadpool->limit_io_max = completion_port_threads; + + return TRUE; +} + +void +ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking) +{ + if (enable_worker_tracking) { + // TODO implement some kind of switch to have the possibily to use it + *enable_worker_tracking = FALSE; + } + + mono_lazy_initialize (&status, initialize); +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) +{ + if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ()) + return FALSE; + + return mono_threadpool_worker_notify_completed (threadpool->worker); +} + +void +ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) +{ + mono_threadpool_worker_notify_completed (threadpool->worker); +} + +void +ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working) +{ + // TODO + MonoError error; + mono_error_set_not_implemented (&error, ""); + mono_error_set_pending_exception (&error); +} + +MonoBoolean +ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) +{ + MonoDomain *domain; + ThreadPoolDomain *tpdomain; + ThreadPoolCounter counter; + + domain = mono_domain_get (); + if (mono_domain_is_unloading (domain)) + return FALSE; + + domains_lock (); + + /* synchronize with mono_threadpool_remove_domain_jobs */ + if (mono_domain_is_unloading (domain)) { + domains_unlock (); + return FALSE; + } + + tpdomain = tpdomain_get (domain, TRUE); + g_assert (tpdomain); + + tpdomain->outstanding_request ++; + g_assert (tpdomain->outstanding_request >= 1); + + mono_refcount_inc (threadpool); + + COUNTER_ATOMIC (threadpool, counter, { + counter._.starting ++; + }); + + mono_threadpool_worker_enqueue (threadpool->worker, worker_callback, NULL); + + domains_unlock (); + + return TRUE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped) +{ + /* This copy the behavior of the current Mono implementation */ + MonoError error; + mono_error_set_not_implemented (&error, ""); + mono_error_set_pending_exception (&error); + return FALSE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle) +{ + /* This copy the behavior of the current Mono implementation */ + return TRUE; +} + +MonoBoolean G_GNUC_UNUSED +ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void) +{ + return FALSE; +} diff --git a/mono/metadata/threadpool.h b/mono/metadata/threadpool.h new file mode 100644 index 00000000000..df17997b16c --- /dev/null +++ b/mono/metadata/threadpool.h @@ -0,0 +1,65 @@ +#ifndef _MONO_METADATA_THREADPOOL_H_ +#define _MONO_METADATA_THREADPOOL_H_ + +#include +#include + +#include +#include + +#define SMALL_STACK (sizeof (gpointer) * 32 * 1024) + +typedef struct _MonoNativeOverlapped MonoNativeOverlapped; + +void +mono_threadpool_cleanup (void); + +MonoAsyncResult * +mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error); +MonoObject * +mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error); + +gboolean +mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout); + +void +mono_threadpool_suspend (void); +void +mono_threadpool_resume (void); + +void +ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); +void +ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); +void +ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads); +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads); +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads); +void +ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking); +MonoBoolean +ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void); +void +ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void); +void +ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working); +MonoBoolean +ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void); + +MonoBoolean +ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped); + +MonoBoolean +ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle); + +MonoBoolean +ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void); + +/* Internals */ + +gboolean +mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error); + +#endif // _MONO_METADATA_THREADPOOL_H_ diff --git a/mono/metadata/w32process-win32.c b/mono/metadata/w32process-win32.c index f34b0373a8d..d7276546f5e 100644 --- a/mono/metadata/w32process-win32.c +++ b/mono/metadata/w32process-win32.c @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/mono/mini/debugger-agent.c b/mono/mini/debugger-agent.c index 171bae152e4..9b188eeb2af 100644 --- a/mono/mini/debugger-agent.c +++ b/mono/mini/debugger-agent.c @@ -58,7 +58,7 @@ #include #include #include -#include +#include #include #include #include @@ -2774,7 +2774,7 @@ suspend_vm (void) /* * Suspend creation of new threadpool threads, since they cannot run */ - mono_threadpool_ms_suspend (); + mono_threadpool_suspend (); mono_loader_unlock (); } @@ -2812,7 +2812,7 @@ resume_vm (void) //g_assert (err == 0); if (suspend_count == 0) - mono_threadpool_ms_resume (); + mono_threadpool_resume (); mono_loader_unlock (); } diff --git a/mono/utils/mono-lazy-init.h b/mono/utils/mono-lazy-init.h index 194eadc15c9..7deca1275f5 100644 --- a/mono/utils/mono-lazy-init.h +++ b/mono/utils/mono-lazy-init.h @@ -20,7 +20,7 @@ /* * These functions should be used if you want some form of lazy initialization. You can have a look at the - * threadpool-ms for a more detailed example. + * threadpool for a more detailed example. * * The idea is that a module can be in 5 different states: * - not initialized: it is the first state it starts in diff --git a/msvc/libmonoruntime.vcxproj b/msvc/libmonoruntime.vcxproj index b73ba7d0cf2..f0e8db4e9a8 100644 --- a/msvc/libmonoruntime.vcxproj +++ b/msvc/libmonoruntime.vcxproj @@ -91,8 +91,9 @@ - - + + + @@ -154,8 +155,9 @@ - - + + + diff --git a/msvc/libmonoruntime.vcxproj.filters b/msvc/libmonoruntime.vcxproj.filters index db52a544a49..1154e5e8fc7 100644 --- a/msvc/libmonoruntime.vcxproj.filters +++ b/msvc/libmonoruntime.vcxproj.filters @@ -163,10 +163,13 @@ Source Files - + Source Files - + + Source Files + + Source Files @@ -465,13 +468,16 @@ Header Files - + + Header Files + + Header Files Header Files - + Header Files